You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:30 UTC
[rocketmq] 06/14: Add async response interface and implementation
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 36cec93387beb331ee47343df5d28f6fdfb992b7
Author: duhenglucky <du...@gmail.com>
AuthorDate: Fri Dec 21 22:22:42 2018 +0800
Add async response interface and implementation
---
.../protocol/header/SendMessageRequestHeader.java | 1 +
pom.xml | 4 +-
.../apache/rocketmq/remoting/RemotingServer.java | 2 +
.../remoting/transport/http2/Http2ServerImpl.java | 5 ++
.../transport/rocketmq/NettyRemotingServer.java | 22 ++++++++
.../org/apache/rocketmq/snode/SnodeController.java | 4 +-
.../SnodeConstant.java} | 25 ++-------
.../snode/processor/ConsumerManageProcessor.java | 4 +-
.../snode/processor/HearbeatProcessor.java | 7 +--
.../snode/processor/PullMessageProcessor.java | 23 +++++++--
.../snode/processor/SendMessageProcessor.java | 19 +++++--
.../rocketmq/snode/service/SnodeOuterService.java | 8 ++-
.../service/impl/SendTransferServiceImpl.java | 45 ----------------
.../snode/service/impl/SnodeOuterServiceImpl.java | 60 +++++++++++++---------
14 files changed, 116 insertions(+), 113 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
index 81e0cff..a032911 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SendMessageRequestHeader.java
@@ -50,6 +50,7 @@ public class SendMessageRequestHeader implements CommandCustomHeader {
private boolean unitMode = false;
@CFNullable
private boolean batch = false;
+
private Integer maxReconsumeTimes;
private String enodeAddr;
diff --git a/pom.xml b/pom.xml
index 47a7b68..08e438b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -100,8 +100,8 @@
<maven.test.skip>false</maven.test.skip>
<maven.javadoc.skip>true</maven.javadoc.skip>
<!-- Compiler settings properties -->
- <maven.compiler.source>1.7</maven.compiler.source>
- <maven.compiler.target>1.7</maven.compiler.target>
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
<sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin>
<!-- Exclude all generated code -->
<sonar.jacoco.itReportPath>${project.basedir}/../test/target/jacoco-it.exec</sonar.jacoco.itReportPath>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index 0d5ff38..a939a3a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
@@ -53,4 +54,5 @@ public interface RemotingServer extends RemotingService {
RemotingServer init(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
+ void sendResponse(final ChannelHandlerContext channel, RemotingCommand remotingCommand);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
index 6ff3d90..bc6a5c1 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -4,6 +4,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -240,4 +241,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
return this.channelEventListener;
}
+ @Override
+ public void sendResponse(ChannelHandlerContext channel, RemotingCommand remotingCommand) {
+
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index d167b49..5387de4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -344,4 +345,25 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
}
+
+ @Override
+ public void sendResponse(ChannelHandlerContext channel, RemotingCommand response) {
+ if (response != null) {
+ response.markResponseType();
+ try {
+ channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ log.error("processRequestWrapper response to {} failed",
+ future.channel().remoteAddress(), future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("processRequestWrapper process request over, but response failed", e);
+ log.error(response.toString());
+ }
+ }
+ }
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
index cb19bc9..50337df 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
@@ -135,9 +135,9 @@ public class SnodeController {
}
public void registerProcessor() {
- snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this.snodeOuterService), sendMessageExcutor);
+ snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, new SendMessageProcessor(this), sendMessageExcutor);
snodeServer.registerProcessor(RequestCode.HEART_BEAT, new HearbeatProcessor(this), heartbeatExecutor);
- snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this.snodeOuterService), pullMessageExcutor);
+ snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, new PullMessageProcessor(this), pullMessageExcutor);
snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, new ConsumerManageProcessor(this), consumerManagerExcutor);
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
similarity index 50%
copy from rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
copy to rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
index a165fd4..2ba91b2 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/constant/SnodeConstant.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.processor;/*
+package org.apache.rocketmq.snode.constant;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,26 +15,11 @@ package org.apache.rocketmq.snode.processor;/*
* limitations under the License.
*/
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+public class SnodeConstant {
+ public static final long heartbeatTimeout = 3000;
-public class PullMessageProcessor implements NettyRequestProcessor {
+ public static final long oneWaytimeout = 10;
- private final SnodeOuterService snodeOuterService;
+ public static final long defaultTimeoutMills = 3000L;
- public PullMessageProcessor(SnodeOuterService snodeOuterService){
- this.snodeOuterService = snodeOuterService;
- }
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
- return snodeOuterService.pullMessage(request);
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index 8509546..aec6f04 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -80,11 +80,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
response.setRemark(null);
return response;
} else {
- log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
+ log.warn("Get all client failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
} else {
- log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
+ log.warn("GetConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
index f06af79..c26ed7c 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -1,4 +1,4 @@
-package org.apache.rocketmq.snode.processor;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,16 +14,13 @@ package org.apache.rocketmq.snode.processor;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+package org.apache.rocketmq.snode.processor;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.common.sysflag.TopicSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index a165fd4..6e474bc 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -16,21 +16,34 @@ package org.apache.rocketmq.snode.processor;/*
*/
import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.SnodeController;
public class PullMessageProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
- private final SnodeOuterService snodeOuterService;
+ private final SnodeController snodeController;
- public PullMessageProcessor(SnodeOuterService snodeOuterService){
- this.snodeOuterService = snodeOuterService;
+ public PullMessageProcessor(SnodeController snodeController) {
+ this.snodeController = snodeController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
- return snodeOuterService.pullMessage(request);
+ CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().pullMessage(ctx, request);
+ responseFuture.whenComplete((data, ex) -> {
+ if (ex == null) {
+ this.snodeController.getSnodeServer().sendResponse(ctx, data);
+ } else {
+ log.error("Pull message error: {}", ex);
+ }
+ });
+ return null;
}
@Override
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index e419475..3f52ed3 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -16,25 +16,34 @@ package org.apache.rocketmq.snode.processor;/*
*/
import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
+import org.apache.rocketmq.snode.SnodeController;
public class SendMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
- private final SnodeOuterService snodeOuterService;
+ private final SnodeController snodeController;
- public SendMessageProcessor(final SnodeOuterService snodeOuterService) {
- this.snodeOuterService = snodeOuterService;
+ public SendMessageProcessor(final SnodeController snodeController) {
+ this.snodeController = snodeController;
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
- return snodeOuterService.sendMessage(request);
+ CompletableFuture<RemotingCommand> responseFuture = snodeController.getSnodeOuterService().sendMessage(request);
+ responseFuture.whenComplete((data, ex) -> {
+ if (ex == null) {
+ snodeController.getSnodeServer().sendResponse(ctx, data);
+ } else {
+ log.error("Send Message error: {}", ex);
+ }
+ });
+ return null;
}
@Override
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
index 8764228..a1ffdd8 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/SnodeOuterService.java
@@ -16,6 +16,9 @@ package org.apache.rocketmq.snode.service;/*
*/
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.CompleteFuture;
+import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -27,9 +30,10 @@ import org.apache.rocketmq.snode.config.SnodeConfig;
public interface SnodeOuterService {
void sendHearbeat(RemotingCommand remotingCommand);
- RemotingCommand sendMessage(RemotingCommand remotingCommand);
+ CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
- RemotingCommand pullMessage(RemotingCommand remotingCommand);
+ CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
+ final RemotingCommand remotingCommand);
void saveSubscriptionData(RemotingCommand remotingCommand);
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
deleted file mode 100644
index df589da..0000000
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SendTransferServiceImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.rocketmq.snode.service.impl;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.rocketmq.common.ServiceState;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.snode.service.SendTransferService;
-import org.apache.rocketmq.snode.service.SnodeOuterService;
-
-public class SendTransferServiceImpl implements SendTransferService {
- private ServiceState serviceState = ServiceState.CREATE_JUST;
- private SnodeOuterService snodeOuterService;
-
- public SendTransferServiceImpl(SnodeOuterService snodeOuterService) {
- snodeOuterService = snodeOuterService;
- }
-
- @Override
- public RemotingCommand sendMessage(RemotingCommand request) {
- return snodeOuterService.sendMessage(request);
- }
-
- @Override
- public boolean start() {
- return false;
- }
-
- @Override
- public void shutdown() {
-
- }
-}
diff --git a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
index 14e577e..1863f6b 100644
--- a/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
+++ b/rocketmq-snode/src/main/java/org/apache/rocketmq/snode/service/impl/SnodeOuterServiceImpl.java
@@ -16,11 +16,14 @@ package org.apache.rocketmq.snode.service.impl;/*
*/
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.CompleteFuture;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -31,22 +34,23 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
-import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
+import org.apache.rocketmq.snode.constant.SnodeConstant;
import org.apache.rocketmq.snode.service.SnodeOuterService;
public class SnodeOuterServiceImpl implements SnodeOuterService {
@@ -58,7 +62,6 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
private static SnodeOuterServiceImpl snodeOuterService;
private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> enodeTable =
new ConcurrentHashMap<>();
- private final long defaultTimeoutMills = 3000L;
private SnodeOuterServiceImpl() {
@@ -97,7 +100,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
String enodeAddr = entry.getValue().get(MixAll.MASTER_ID);
if (enodeAddr != null) {
try {
- RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, defaultTimeoutMills);
+ RemotingCommand response = this.client.invokeSync(enodeAddr, remotingCommand, SnodeConstant.defaultTimeoutMills);
} catch (Exception ex) {
log.warn("Send heart beat faild:{} ,ex:{}", enodeAddr, ex);
}
@@ -106,27 +109,19 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
}
@Override
- public RemotingCommand sendMessage(RemotingCommand request) {
- try {
- SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
- RemotingCommand response =
- this.client.invokeSync(sendMessageRequestHeaderV2.getN(), request, defaultTimeoutMills);
- return response;
- } catch (Exception ex) {
- log.error("Send message async error:", ex);
- }
- return null;
- }
-
- @Override
- public RemotingCommand pullMessage(RemotingCommand request) {
+ public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
+ RemotingCommand request) {
try {
final PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
- RemotingCommand remotingCommand = this.client.invokeSync(requestHeader.getEnodeAddr(), request, 20 * defaultTimeoutMills);
- log.info("Pull message response:{}", remotingCommand);
- log.info("Pull message response:{}", remotingCommand.getBody().length);
- return remotingCommand;
+ this.client.invokeAsync(requestHeader.getEnodeAddr(), request, SnodeConstant.defaultTimeoutMills, new InvokeCallback() {
+ @Override
+ public void operationComplete(ResponseFuture responseFuture) {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ snodeController.getSnodeServer().sendResponse(context, response);
+ }
+ });
+ return null;
} catch (Exception ex) {
log.error("pull message async error:", ex);
}
@@ -176,7 +171,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
synchronized (this) {
- ClusterInfo clusterInfo = getBrokerClusterInfo(defaultTimeoutMills);
+ ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.defaultTimeoutMills);
if (clusterInfo != null) {
HashMap<String, Set<String>> brokerAddrs = clusterInfo.getClusterAddrTable();
for (Map.Entry<String, Set<String>> entry : brokerAddrs.entrySet()) {
@@ -212,7 +207,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
for (String nameServer : nameServerAddressList) {
try {
- this.client.invokeSync(nameSrvAddr, remotingCommand, 3000L);
+ this.client.invokeSync(nameSrvAddr, remotingCommand, SnodeConstant.heartbeatTimeout);
} catch (Exception ex) {
log.warn("Register Snode to Nameserver addr: {} error, ex:{} ", nameServer, ex);
}
@@ -221,6 +216,21 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
}
@Override
+ public CompletableFuture<RemotingCommand> sendMessage(RemotingCommand request) {
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ try {
+ SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+ this.client.invokeAsync(sendMessageRequestHeaderV2.getN(), request, SnodeConstant.defaultTimeoutMills, (responseFuture) -> {
+ future.complete(responseFuture.getResponseCommand());
+ });
+ } catch (Exception ex) {
+ log.error("Send message async error:{}", ex);
+ future.completeExceptionally(ex);
+ }
+ return future;
+ }
+
+ @Override
public void notifyConsumerIdsChanged(
final Channel channel,
final String consumerGroup) {
@@ -235,7 +245,7 @@ public class SnodeOuterServiceImpl implements SnodeOuterService {
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
- this.snodeController.getSnodeServer().invokeOneway(channel, request, 10);
+ this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.oneWaytimeout);
} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
}