You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/17 08:01:23 UTC

[rocketmq] branch develop updated: [ROCKETMQ-355][POLISH]Async send method polish - fix the timeout semantic (#318)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new bc0c04b  [ROCKETMQ-355][POLISH]Async send method polish - fix the timeout semantic (#318)
bc0c04b is described below

commit bc0c04bff0be7970cf603835826ef17e855694ba
Author: what-a-good-jungle <35...@qq.com>
AuthorDate: Tue Jul 17 16:01:20 2018 +0800

    [ROCKETMQ-355][POLISH]Async send method polish - fix the timeout semantic (#318)
---
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  15 +-
 .../impl/producer/DefaultMQProducerImpl.java       | 193 +++++++++++++++++----
 .../client/consumer/DefaultMQPushConsumerTest.java |   2 +-
 .../client/producer/DefaultMQProducerTest.java     |  69 +++++++-
 .../apache/rocketmq/remoting/RemotingClient.java   |   2 +
 .../remoting/netty/NettyRemotingAbstract.java      |   8 +-
 .../remoting/netty/NettyRemotingClient.java        |  20 ++-
 .../remoting/netty/NettyRemotingClientTest.java    |   8 +-
 .../producer/async/AsyncSendExceptionIT.java       |   6 +-
 9 files changed, 271 insertions(+), 52 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 3a023e3..1837204 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -308,6 +308,7 @@ public class MQClientAPIImpl {
         final SendMessageContext context,
         final DefaultMQProducerImpl producer
     ) throws RemotingException, MQBrokerException, InterruptedException {
+        long beginStartTime = System.currentTimeMillis();
         RemotingCommand request = null;
         if (sendSmartMsg || msg instanceof MessageBatch) {
             SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
@@ -324,11 +325,19 @@ public class MQClientAPIImpl {
                 return null;
             case ASYNC:
                 final AtomicInteger times = new AtomicInteger();
-                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance,
+                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
+                if (timeoutMillis < costTimeAsync) {
+                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
+                }
+                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                     retryTimesWhenSendFailed, times, context, producer);
                 return null;
             case SYNC:
-                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
+                long costTimeSync = System.currentTimeMillis() - beginStartTime;
+                if (timeoutMillis < costTimeSync) {
+                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
+                }
+                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
             default:
                 assert false;
                 break;
@@ -2080,4 +2089,4 @@ public class MQClientAPIImpl {
             throw new MQClientException(response.getCode(), response.getRemark());
         }
     }
-}
\ No newline at end of file
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 4b5e373..e1d9f90 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -64,6 +64,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import java.util.concurrent.RejectedExecutionException;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -414,19 +416,48 @@ public class DefaultMQProducerImpl implements MQProducerInner {
      * DEFAULT ASYNC -------------------------------------------------------
      */
     public void send(Message msg,
-        SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
+                     SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
         send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, SendCallback sendCallback, long timeout)
+    /**
+     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
+     * A new one will be provided in next version
+     * @param msg
+     * @param sendCallback
+     * @param timeout the <code>sendCallback</code> will be invoked at most time
+     * @throws RejectedExecutionException
+     */
+    @Deprecated
+    public void send(final Message msg, final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
+        final long beginStartTime = System.currentTimeMillis();
+        ExecutorService executor = this.getCallbackExecutor();
         try {
-            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    long costTime = System.currentTimeMillis() - beginStartTime;
+                    if (timeout > costTime) {
+                        try {
+                            sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
+                        } catch (Exception e) {
+                            sendCallback.onException(e);
+                        }
+                    } else {
+                        sendCallback.onException(
+                            new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
+                    }
+                }
+
+            });
+        } catch (RejectedExecutionException e) {
+            throw new MQClientException("executor rejected ", e);
         }
+
     }
 
+
     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
         return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
     }
@@ -450,6 +481,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         long endTimestamp = beginTimestampFirst;
         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
         if (topicPublishInfo != null && topicPublishInfo.ok()) {
+            boolean callTimeout = false;
             MessageQueue mq = null;
             Exception exception = null;
             SendResult sendResult = null;
@@ -464,7 +496,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                     brokersSent[times] = mq.getBrokerName();
                     try {
                         beginTimestampPrev = System.currentTimeMillis();
-                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
+                        long costTime = beginTimestampPrev - beginTimestampFirst;
+                        if (timeout < costTime) {
+                            callTimeout = true;
+                            break;
+                        }
+
+                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                         endTimestamp = System.currentTimeMillis();
                         this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                         switch (communicationMode) {
@@ -546,6 +584,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
 
             MQClientException mqClientException = new MQClientException(info, exception);
+            if (callTimeout) {
+                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
+            }
+
             if (exception instanceof MQBrokerException) {
                 mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
             } else if (exception instanceof RemotingConnectException) {
@@ -587,11 +629,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     }
 
     private SendResult sendKernelImpl(final Message msg,
-        final MessageQueue mq,
-        final CommunicationMode communicationMode,
-        final SendCallback sendCallback,
-        final TopicPublishInfo topicPublishInfo,
-        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+                                      final MessageQueue mq,
+                                      final CommunicationMode communicationMode,
+                                      final SendCallback sendCallback,
+                                      final TopicPublishInfo topicPublishInfo,
+                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        long beginStartTime = System.currentTimeMillis();
         String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
         if (null == brokerAddr) {
             tryToFindTopicPublishInfo(mq.getTopic());
@@ -691,13 +734,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                             tmpMessage = MessageAccessor.cloneMessage(msg);
                             msg.setBody(prevBody);
                         }
-
+                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
+                        if (timeout < costTimeAsync) {
+                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
+                        }
                         sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                             brokerAddr,
                             mq.getBrokerName(),
                             tmpMessage,
                             requestHeader,
-                            timeout,
+                            timeout - costTimeAsync,
                             communicationMode,
                             sendCallback,
                             topicPublishInfo,
@@ -708,12 +754,16 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                         break;
                     case ONEWAY:
                     case SYNC:
+                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
+                        if (timeout < costTimeSync) {
+                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
+                        }
                         sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                             brokerAddr,
                             mq.getBrokerName(),
                             msg,
                             requestHeader,
-                            timeout,
+                            timeout - costTimeSync,
                             communicationMode,
                             context,
                             this);
@@ -844,6 +894,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
 
     public SendResult send(Message msg, MessageQueue mq, long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        long beginStartTime = System.currentTimeMillis();
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
 
@@ -851,6 +902,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
             throw new MQClientException("message's topic not equal mq's topic", null);
         }
 
+        long costTime = System.currentTimeMillis() - beginStartTime;
+        if (timeout < costTime) {
+            throw new RemotingTooMuchRequestException("call timeout");
+        }
+
         return this.sendKernelImpl(msg, mq, CommunicationMode.SYNC, null, null, timeout);
     }
 
@@ -862,20 +918,55 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         send(msg, mq, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, MessageQueue mq, SendCallback sendCallback, long timeout)
+    /**
+     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
+     * A new one will be provided in next version
+     * @param msg
+     * @param mq
+     * @param sendCallback
+     * @param timeout the <code>sendCallback</code> will be invoked at most time
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    @Deprecated
+    public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
-        this.makeSureStateOK();
-        Validators.checkMessage(msg, this.defaultMQProducer);
+        final long beginStartTime = System.currentTimeMillis();
+        ExecutorService executor = this.getCallbackExecutor();
+        try {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        makeSureStateOK();
+                        Validators.checkMessage(msg, defaultMQProducer);
 
-        if (!msg.getTopic().equals(mq.getTopic())) {
-            throw new MQClientException("message's topic not equal mq's topic", null);
-        }
+                        if (!msg.getTopic().equals(mq.getTopic())) {
+                            throw new MQClientException("message's topic not equal mq's topic", null);
+                        }
+                        long costTime = System.currentTimeMillis() - beginStartTime;
+                        if (timeout > costTime) {
+                            try {
+                                sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null,
+                                    timeout - costTime);
+                            } catch (MQBrokerException e) {
+                                throw new MQClientException("unknown exception", e);
+                            }
+                        } else {
+                            sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
+                        }
+                    } catch (Exception e) {
+                        sendCallback.onException(e);
+                    }
 
-        try {
-            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, sendCallback, null, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknown exception", e);
+                }
+
+            });
+        } catch (RejectedExecutionException e) {
+            throw new MQClientException("executor rejected ", e);
         }
+
     }
 
     /**
@@ -913,6 +1004,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         final CommunicationMode communicationMode,
         final SendCallback sendCallback, final long timeout
     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        long beginStartTime = System.currentTimeMillis();
         this.makeSureStateOK();
         Validators.checkMessage(msg, this.defaultMQProducer);
 
@@ -925,8 +1017,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
                 throw new MQClientException("select message queue throwed exception.", e);
             }
 
+            long costTime = System.currentTimeMillis() - beginStartTime;
+            if (timeout < costTime) {
+                throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
+            }
             if (mq != null) {
-                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout);
+                return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
             } else {
                 throw new MQClientException("select message queue return null.", null);
             }
@@ -943,12 +1039,47 @@ public class DefaultMQProducerImpl implements MQProducerInner {
         send(msg, selector, arg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
     }
 
-    public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback, long timeout)
+    /**
+     * It will be removed at 4.4.0 cause for exception handling and the wrong Semantics of timeout.
+     * A new one will be provided in next version
+     * @param msg
+     * @param selector
+     * @param arg
+     * @param sendCallback
+     * @param timeout the <code>sendCallback</code> will be invoked at most time
+     * @throws MQClientException
+     * @throws RemotingException
+     * @throws InterruptedException
+     */
+    @Deprecated
+    public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
         throws MQClientException, RemotingException, InterruptedException {
+        final long beginStartTime = System.currentTimeMillis();
+        ExecutorService executor = this.getCallbackExecutor();
         try {
-            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback, timeout);
-        } catch (MQBrokerException e) {
-            throw new MQClientException("unknownn exception", e);
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    long costTime = System.currentTimeMillis() - beginStartTime;
+                    if (timeout > costTime) {
+                        try {
+                            try {
+                                sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, sendCallback,
+                                    timeout - costTime);
+                            } catch (MQBrokerException e) {
+                                throw new MQClientException("unknownn exception", e);
+                            }
+                        } catch (Exception e) {
+                            sendCallback.onException(e);
+                        }
+                    } else {
+                        sendCallback.onException(new RemotingTooMuchRequestException("call timeout"));
+                    }
+                }
+
+            });
+        } catch (RejectedExecutionException e) {
+            throw new MQClientException("exector rejected ", e);
         }
     }
 
@@ -1082,6 +1213,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
     public void setCallbackExecutor(final ExecutorService callbackExecutor) {
         this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
     }
+    public ExecutorService getCallbackExecutor() {
+        return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
+
+    }
 
     public SendResult send(Message msg,
         long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index d6dce86..ff2fb78 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -289,4 +289,4 @@ public class DefaultMQPushConsumerTest {
         }
         return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
     }
-}
\ No newline at end of file
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index d3c6cc8..c225afd 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -24,6 +24,9 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -36,6 +39,7 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
@@ -65,6 +69,8 @@ public class DefaultMQProducerTest {
     private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
     @Mock
     private MQClientAPIImpl mQClientAPIImpl;
+    @Mock
+    private NettyRemotingClient nettyRemotingClient;
 
     private DefaultMQProducer producer;
     private Message message;
@@ -161,38 +167,91 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
+        when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
         producer.send(message, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
                 assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
                 assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+                countDownLatch.countDown();
             }
 
             @Override
             public void onException(Throwable e) {
+                countDownLatch.countDown();
             }
         });
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        callbackExecutor.shutdown();
+    }
+    @Test
+    public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
+        final AtomicInteger cc = new AtomicInteger(0);
+        final CountDownLatch countDownLatch = new CountDownLatch(6);
+        ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
+        when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
+        when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
+
+        SendCallback sendCallback = new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+            }
 
+            @Override
+            public void onException(Throwable e) {
+                e.printStackTrace();
+                cc.incrementAndGet();
+                countDownLatch.countDown();
+            }
+        };
+        MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                return null;
+            }
+        };
+
+        Message message = new Message();
+        message.setTopic("test");
+        message.setBody("hello world".getBytes());
+        producer.send(new Message(),sendCallback);
+        producer.send(message,sendCallback,1000);
+        producer.send(message,new MessageQueue(),sendCallback);
+        producer.send(new Message(),new MessageQueue(),sendCallback,1000);
+        producer.send(new Message(),messageQueueSelector,null,sendCallback);
+        producer.send(message,messageQueueSelector,null,sendCallback,1000);
+
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        callbackExecutor.shutdown();
+        assertThat(cc.get()).isEqualTo(6);
     }
 
     @Test
     public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
-        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
+        when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
         producer.send(bigMessage, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
                 assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
                 assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
+                countDownLatch.countDown();
             }
 
             @Override
             public void onException(Throwable e) {
+                countDownLatch.countDown();
             }
         });
-
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        callbackExecutor.shutdown();
     }
 
     @Test
@@ -249,7 +308,7 @@ public class DefaultMQProducerTest {
 
     @Test
     public void testSetCallbackExecutor() throws MQClientException {
-        String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
+        String producerGroupTemp = "testSetCallbackExecutor_" + System.currentTimeMillis();
         producer = new DefaultMQProducer(producerGroupTemp);
         producer.setNamesrvAddr("127.0.0.1:9876");
         producer.start();
@@ -319,4 +378,4 @@ public class DefaultMQProducerTest {
         }
         return assertionErrors[0];
     }
-}
\ No newline at end of file
+}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 2aea14c..c0754db 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -48,5 +48,7 @@ public interface RemotingClient extends RemotingService {
 
     void setCallbackExecutor(final ExecutorService callbackExecutor);
 
+    ExecutorService getCallbackExecutor();
+
     boolean isChannelWritable(final String addr);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 45ca730..8dccebc 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -403,11 +403,17 @@ public abstract class NettyRemotingAbstract {
     public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
         final InvokeCallback invokeCallback)
         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        long beginStartTime = System.currentTimeMillis();
         final int opaque = request.getOpaque();
         boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
         if (acquired) {
             final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, invokeCallback, once);
+            long costTime = System.currentTimeMillis() - beginStartTime;
+            if (timeoutMillis < costTime) {
+                throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
+            }
+
+            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
             this.responseTable.put(opaque, responseFuture);
             try {
                 channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 241f2b0..33c2eed 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -360,13 +360,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     @Override
     public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
         throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+        long beginStartTime = System.currentTimeMillis();
         final Channel channel = this.getAndCreateChannel(addr);
         if (channel != null && channel.isActive()) {
             try {
                 if (this.rpcHook != null) {
                     this.rpcHook.doBeforeRequest(addr, request);
                 }
-                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeoutMillis < costTime) {
+                    throw new RemotingTimeoutException("invokeSync call timeout");
+                }
+                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
                 if (this.rpcHook != null) {
                     this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
                 }
@@ -390,8 +395,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     }
 
     private Channel getAndCreateChannel(final String addr) throws InterruptedException {
-        if (null == addr)
+        if (null == addr) {
             return getAndCreateNameserverChannel();
+        }
 
         ChannelWrapper cw = this.channelTables.get(addr);
         if (cw != null && cw.isOK()) {
@@ -431,8 +437,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                         this.namesrvAddrChoosed.set(newAddr);
                         log.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
                         Channel channelNew = this.createChannel(newAddr);
-                        if (channelNew != null)
+                        if (channelNew != null) {
                             return channelNew;
+                        }
                     }
                 }
             } catch (Exception e) {
@@ -511,13 +518,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
         throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
         RemotingSendRequestException {
+        long beginStartTime = System.currentTimeMillis();
         final Channel channel = this.getAndCreateChannel(addr);
         if (channel != null && channel.isActive()) {
             try {
                 if (this.rpcHook != null) {
                     this.rpcHook.doBeforeRequest(addr, request);
                 }
-                this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+                long costTime = System.currentTimeMillis() - beginStartTime;
+                if (timeoutMillis < costTime) {
+                    throw new RemotingTooMuchRequestException("invokeAsync call timeout");
+                }
+                this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
             } catch (RemotingSendRequestException e) {
                 log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
                 this.closeChannel(addr, channel);
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 04a3beb..6b5633d 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -30,14 +30,10 @@ public class NettyRemotingClientTest {
     private NettyRemotingClient remotingClient = new NettyRemotingClient(new NettyClientConfig());
 
     @Test
-    public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {
-        Field field = NettyRemotingClient.class.getDeclaredField("publicExecutor");
-        field.setAccessible(true);
-        assertThat(remotingClient.getCallbackExecutor()).isEqualTo(field.get(remotingClient));
-
+    public void testSetCallbackExecutor() throws NoSuchFieldException, IllegalAccessException {        
         ExecutorService customized = Executors.newCachedThreadPool();
         remotingClient.setCallbackExecutor(customized);
 
         assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
     }
-}
\ No newline at end of file
+}
diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
index 0bad6ea..d1a1fd1 100644
--- a/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
+++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/async/AsyncSendExceptionIT.java
@@ -61,7 +61,7 @@ public class AsyncSendExceptionIT extends BaseConf {
         producer.send(msg, sendCallback);
     }
 
-    @Test(expected = java.lang.NullPointerException.class)
+    @Test
     public void testSendMQNull() throws Exception {
         Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
         DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
@@ -69,7 +69,7 @@ public class AsyncSendExceptionIT extends BaseConf {
         producer.send(msg, messageQueue, SendCallBackFactory.getSendCallBack());
     }
 
-    @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)
+    @Test
     public void testSendSelectorNull() throws Exception {
         Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
         DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);
@@ -77,7 +77,7 @@ public class AsyncSendExceptionIT extends BaseConf {
         producer.send(msg, selector, 100, SendCallBackFactory.getSendCallBack());
     }
 
-    @Test(expected = org.apache.rocketmq.client.exception.MQClientException.class)
+    @Test
     public void testSelectorThrowsException() throws Exception {
         Message msg = new Message(topic, RandomUtils.getStringByUUID().getBytes());
         DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr);