You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:30 UTC

[rocketmq] 06/14: Add async response interface and implementation

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 36cec93387beb331ee47343df5d28f6fdfb992b7
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Dec 21 22:22:42 2018 +0800

    Add async response interface and implementation
---
 .../protocol/header/SendMessageRequestHeader.java  |  1 +
 pom.xml                                            |  4 +-
 .../apache/rocketmq/remoting/RemotingServer.java   |  2 +
 .../remoting/transport/http2/Http2ServerImpl.java  |  5 ++
 .../transport/rocketmq/NettyRemotingServer.java    | 22 ++++++++
 .../org/apache/rocketmq/snode/SnodeController.java |  4 +-
 .../SnodeConstant.java}                            | 25 ++-------
 .../snode/processor/ConsumerManageProcessor.java   |  4 +-
 .../snode/processor/HearbeatProcessor.java         |  7 +--
 .../snode/processor/PullMessageProcessor.java      | 23 +++++++--
 .../snode/processor/SendMessageProcessor.java      | 19 +++++--
 .../rocketmq/snode/service/SnodeOuterService.java  |  8 ++-
 .../service/impl/SendTransferServiceImpl.java      | 45 ----------------
 .../snode/service/impl/SnodeOuterServiceImpl.java  | 60 +++++++++++++---------
 14 files changed, 116 insertions(+), 113 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 81e0cff..a032911 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -50,6 +50,7 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
     private boolean unitMode = false;
     @CFNullable
     private boolean batch = false;
+
     private Integer maxReconsumeTimes;
 
     private String enodeAddr;
diff --git a/pom.xml b/pom.xml
index 47a7b68..08e438b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,8 +100,8 @@
         <maven.test.skip>false</maven.test.skip>
         <maven.javadoc.skip>true</maven.javadoc.skip>
         <!-- Compiler settings properties -->
-        <maven.compiler.source>1.7</maven.compiler.source>
-        <maven.compiler.target>1.7</maven.compiler.target>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
         <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
         <!-- Exclude all generated code -->
         <sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index 0d5ff38..a939a3a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.remoting;
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.remoting.common.Pair;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -53,4 +54,5 @@ public interface RemotingServer extends RemotingService {
 
     RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
 
+    void sendResponse(final ChannelHandlerContext channel, RemotingCommand remotingCommand);
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
index 6ff3d90..bc6a5c1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -4,6 +4,7 @@ import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
@@ -240,4 +241,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
         return this.channelEventListener;
     }
 
+    @Override
+    public void sendResponse(ChannelHandlerContext channel, RemotingCommand remotingCommand) {
+
+    }
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index d167b49..5387de4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -344,4 +345,25 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
     public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
 
     }
+
+    @Override
+    public void sendResponse(ChannelHandlerContext channel, RemotingCommand response) {
+        if (response != null) {
+            response.markResponseType();
+            try {
+                channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        if (!future.isSuccess()) {
+                            log.error("processRequestWrapper response to {} failed",
+                                future.channel().remoteAddress(), future.cause());
+                        }
+                    }
+                });
+            } catch (Throwable e) {
+                log.error("processRequestWrapper process request over, but response failed", e);
+                log.error(response.toString());
+            }
+        }
+    }
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index cb19bc9..50337df 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -135,9 +135,9 @@ public class SnodeController {
     }
 
     public void registerProcessor() {
-        snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this.snodeOuterService), sendMessageExcutor);
+        snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this), sendMessageExcutor);
         snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
-        snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this.snodeOuterService), pullMessageExcutor);
+        snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this), pullMessageExcutor);
         snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
     }
 
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
similarity index 50%
copy from rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
copy to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index a165fd4..2ba91b2 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.processor;/*
+package org.apache.rocketmq.snode.constant;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -15,26 +15,11 @@ package org.apache.rocketmq.snode.processor;/*
  * limitations under the License.
  */
 
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+public class SnodeConstant {
+    public static final long heartbeatTimeout = 3000;
 
-public class PullMessageProcessor implements NettyRequestProcessor {
+    public static final long oneWaytimeout = 10;
 
-    private final SnodeOuterService snodeOuterService;
+    public static final long defaultTimeoutMills = 3000L;
 
-    public PullMessageProcessor(SnodeOuterService snodeOuterService){
-        this.snodeOuterService = snodeOuterService;
-    }
-
-    @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
-        return snodeOuterService.pullMessage(request);
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return false;
-    }
 }
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index 8509546..aec6f04 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -80,11 +80,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
                 response.setRemark(null);
                 return response;
             } else {
-                log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
+                log.warn("Get all client failed, {} {}", requestHeader.getConsumerGroup(),
                     RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
             }
         } else {
-            log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
+            log.warn("GetConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
                 RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
         }
 
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
index f06af79..c26ed7c 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.processor;/*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -14,16 +14,13 @@ package org.apache.rocketmq.snode.processor;/*
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
+package org.apache.rocketmq.snode.processor;
 import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index a165fd4..6e474bc 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -16,21 +16,34 @@ package org.apache.rocketmq.snode.processor;/*
  */
 
 import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.SnodeController;
 
 public class PullMessageProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
 
-    private final SnodeOuterService snodeOuterService;
+    private final SnodeController snodeController;
 
-    public PullMessageProcessor(SnodeOuterService snodeOuterService){
-        this.snodeOuterService = snodeOuterService;
+    public PullMessageProcessor(SnodeController snodeController) {
+        this.snodeController = snodeController;
     }
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
-        return snodeOuterService.pullMessage(request);
+        CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().pullMessage(ctx, request);
+        responseFuture.whenComplete((data, ex) -> {
+            if (ex == null) {
+                this.snodeController.getSnodeServer().sendResponse(ctx, data);
+            } else {
+                log.error("Pull message error: {}", ex);
+            }
+        });
+        return null;
     }
 
     @Override
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index e419475..3f52ed3 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -16,25 +16,34 @@ package org.apache.rocketmq.snode.processor;/*
  */
 
 import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.SnodeController;
 
 public class SendMessageProcessor implements NettyRequestProcessor {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
 
-    private final SnodeOuterService snodeOuterService;
+    private final SnodeController snodeController;
 
-    public SendMessageProcessor(final SnodeOuterService snodeOuterService) {
-        this.snodeOuterService = snodeOuterService;
+    public SendMessageProcessor(final SnodeController snodeController) {
+        this.snodeController = snodeController;
     }
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
-        return snodeOuterService.sendMessage(request);
+        CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().sendMessage(request);
+        responseFuture.whenComplete((data, ex) -> {
+            if (ex == null) {
+                snodeController.getSnodeServer().sendResponse(ctx, data);
+            } else {
+                log.error("Send Message error: {}", ex);
+            }
+        });
+        return null;
     }
 
     @Override
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
index 8764228..a1ffdd8 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
@@ -16,6 +16,9 @@ package org.apache.rocketmq.snode.service;/*
  */
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.CompleteFuture;
+import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -27,9 +30,10 @@ import org.apache.rocketmq.snode.config.SnodeConfig;
 public interface SnodeOuterService {
     void sendHearbeat(RemotingCommand remotingCommand);
 
-    RemotingCommand sendMessage(RemotingCommand remotingCommand);
+    CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
 
-    RemotingCommand pullMessage(RemotingCommand remotingCommand);
+    CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
+        final RemotingCommand remotingCommand);
 
     void saveSubscriptionData(RemotingCommand remotingCommand);
 
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
deleted file mode 100644
index df589da..0000000
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.rocketmq.snode.service.impl;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.rocketmq.common.ServiceState;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SendTransferService;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
-
-public class SendTransferServiceImpl implements SendTransferService {
-    private ServiceState serviceState = ServiceState.CREATE_JUST;
-    private SnodeOuterService snodeOuterService;
-
-    public SendTransferServiceImpl(SnodeOuterService snodeOuterService) {
-        snodeOuterService = snodeOuterService;
-    }
-
-    @Override
-    public RemotingCommand sendMessage(RemotingCommand request) {
-        return snodeOuterService.sendMessage(request);
-    }
-
-    @Override
-    public boolean start() {
-        return false;
-    }
-
-    @Override
-    public void shutdown() {
-
-    }
-}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
index 14e577e..1863f6b 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
@@ -16,11 +16,14 @@ package org.apache.rocketmq.snode.service.impl;/*
  */
 
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.CompleteFuture;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -31,22 +34,23 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
 import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.RemotingClientFactory;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.snode.SnodeController;
 import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
 import org.apache.rocketmq.snode.service.SnodeOuterService;
 
 public class SnodeOuterServiceImpl implements SnodeOuterService {
@@ -58,7 +62,6 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
     private static SnodeOuterServiceImpl snodeOuterService;
     private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
         new ConcurrentHashMap<>();
-    private final long defaultTimeoutMills = 3000L;
 
     private SnodeOuterServiceImpl() {
 
@@ -97,7 +100,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
             String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
             if (enodeAddr != null) {
                 try {
-                    RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, defaultTimeoutMills);
+                    RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills);
                 } catch (Exception ex) {
                     log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
                 }
@@ -106,27 +109,19 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
     }
 
     @Override
-    public RemotingCommand sendMessage(RemotingCommand request) {
-        try {
-            SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
-            RemotingCommand response =
-                this.client.invokeSync(sendMessageRequestHeaderV2.getN(), request, defaultTimeoutMills);
-            return response;
-        } catch (Exception ex) {
-            log.error("Send message async error:", ex);
-        }
-        return null;
-    }
-
-    @Override
-    public RemotingCommand pullMessage(RemotingCommand request) {
+    public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
+        RemotingCommand request) {
         try {
             final PullMessageRequestHeader requestHeader =
                 (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-            RemotingCommand remotingCommand =  this.client.invokeSync(requestHeader.getEnodeAddr(), request, 20 * defaultTimeoutMills);
-            log.info("Pull message response:{}", remotingCommand);
-            log.info("Pull message response:{}", remotingCommand.getBody().length);
-            return remotingCommand;
+            this.client.invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.defaultTimeoutMills, new InvokeCallback() {
+                @Override
+                public void operationComplete(ResponseFuture responseFuture) {
+                    RemotingCommand response = responseFuture.getResponseCommand();
+                    snodeController.getSnodeServer().sendResponse(context, response);
+                }
+            });
+            return null;
         } catch (Exception ex) {
             log.error("pull message async error:", ex);
         }
@@ -176,7 +171,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
     public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         synchronized (this) {
-            ClusterInfo clusterInfo = getBrokerClusterInfo(defaultTimeoutMills);
+            ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills);
             if (clusterInfo != null) {
                 HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
                 for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
@@ -212,7 +207,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
         if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
             for (String nameServer : nameServerAddressList) {
                 try {
-                    this.client.invokeSync(nameSrvAddr, remotingCommand, 3000L);
+                    this.client.invokeSync(nameSrvAddr, remotingCommand, SnodeConstant.heartbeatTimeout);
                 } catch (Exception ex) {
                     log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
                 }
@@ -221,6 +216,21 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
     }
 
     @Override
+    public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        try {
+            SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+            this.client.invokeAsync(sendMessageRequestHeaderV2.getN(), request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
+                future.complete(responseFuture.getResponseCommand());
+            });
+        } catch (Exception ex) {
+            log.error("Send message async error:{}", ex);
+            future.completeExceptionally(ex);
+        }
+        return future;
+    }
+
+    @Override
     public void notifyConsumerIdsChanged(
         final Channel channel,
         final String consumerGroup) {
@@ -235,7 +245,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
             RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
 
         try {
-            this.snodeController.getSnodeServer().invokeOneway(channel, request, 10);
+            this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
         } catch (Exception e) {
             log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
         }