You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:36 UTC
[rocketmq] 12/14: Polish snode related RemotingChannel
implementation
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit db97b4063888e85a9d2f39246dc9f71ebde75ad0
Author: duhenglucky <du...@gmail.com>
AuthorDate: Wed Jan 2 14:51:10 2019 +0800
Polish snode related RemotingChannel implementation
---
.../rocketmq/remoting/RemotingClientFactory.java | 16 ++++++++
.../apache/rocketmq/remoting/RemotingServer.java | 4 +-
.../rocketmq/remoting/RemotingServerFactory.java | 16 ++++++++
.../netty/NettyChannelHandlerContextImpl.java | 43 ++++++++++++++--------
.../rocketmq/remoting/netty/NettyChannelImpl.java | 20 +++++++++-
.../remoting/serialize/MsgPackSerializable.java | 21 +++++++++--
.../remoting/serialize/SerializerFactory.java | 16 ++++++++
.../transport/NettyRemotingServerAbstract.java | 16 ++++++++
.../remoting/transport/http2/Http2ClientImpl.java | 16 ++++++++
.../remoting/transport/http2/Http2ServerImpl.java | 20 ++++++++--
.../transport/rocketmq/NettyRemotingServer.java | 23 ------------
.../org/apache/rocketmq/snode/SnodeStartup.java | 6 ++-
.../snode/processor/ConsumerManageProcessor.java | 39 +++++++++-----------
.../snode/processor/HearbeatProcessor.java | 22 ++++-------
.../snode/processor/PullMessageProcessor.java | 12 +-----
.../snode/processor/SendMessageProcessor.java | 3 +-
.../rocketmq/snode/service/EnodeService.java | 4 +-
.../snode/service/impl/EnodeServiceImpl.java | 6 +--
.../snode/service/impl/NnodeServiceImpl.java | 2 -
19 files changed, 197 insertions(+), 108 deletions(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index 825d96b..4df2a70 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.remoting;
import java.util.Map;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
index c773e5a..a37b538 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java
@@ -48,7 +48,5 @@ public interface RemotingServer extends RemotingService {
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
- RemotingServer init(ServerConfig nettyServerConfig, ChannelEventListener channelEventListener);
-
- void sendResponse(final RemotingChannel remotingChannel, RemotingCommand remotingCommand);
+ RemotingServer init(ServerConfig serverConfig, ChannelEventListener channelEventListener);
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index a2cdf2b..6dbf22a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.remoting;
import java.util.Map;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
index 673c12a..a00ef05 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelHandlerContextImpl.java
@@ -1,19 +1,16 @@
-package org.apache.rocketmq.remoting.netty;/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
+package org.apache.rocketmq.remoting.netty;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
@@ -69,8 +66,24 @@ public class NettyChannelHandlerContextImpl implements RemotingChannel {
}
@Override
- public void reply(final RemotingCommand command) {
- channelHandlerContext.writeAndFlush(command);
+ public void reply(final RemotingCommand response) {
+ if (response != null) {
+ response.markResponseType();
+ try {
+ this.channelHandlerContext.writeAndFlush(response).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ log.error("processRequestWrapper response to {} failed",
+ future.channel().remoteAddress(), future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("processRequestWrapper process request over, but response failed", e);
+ log.error(response.toString());
+ }
+ }
}
public ChannelHandlerContext getChannelHandlerContext() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
index 86436c1..fe33a52 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyChannelImpl.java
@@ -71,8 +71,24 @@ public class NettyChannelImpl implements RemotingChannel {
}
@Override
- public void reply(final RemotingCommand command) {
- channel.writeAndFlush(command);
+ public void reply(final RemotingCommand response) {
+ if (response != null) {
+ response.markResponseType();
+ try {
+ channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (!future.isSuccess()) {
+ log.error("ProcessRequestWrapper response to {} failed",
+ future.channel().remoteAddress(), future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("ProcessRequestWrapper process request over, but response failed", e);
+ log.error(response.toString());
+ }
+ }
}
public io.netty.channel.Channel getChannel() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
index 6c7ba78..4bf1896 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/MsgPackSerializable.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.remoting.serialize;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -6,10 +22,7 @@ import org.msgpack.MessagePack;
public class MsgPackSerializable implements Serializer {
private final MessagePack messagePack = new MessagePack();
-// public MsgPackSerializable(){
-// messagePack.register(LanguageCode.class);
-// messagePack.register(SerializeType.class);
-// }
+
@Override
public SerializeType type() {
return SerializeType.MSGPACK;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
index e015478..1330b39 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/serialize/SerializerFactory.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.remoting.serialize;
import java.util.Map;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
index cec0086..5ef9783 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.remoting.transport;
import io.netty.channel.Channel;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
index e37f354..71bee1e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.remoting.transport.http2;
import io.netty.bootstrap.Bootstrap;
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
index 8e5e4c7..649055b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.rocketmq.remoting.transport.http2;
import io.netty.bootstrap.ServerBootstrap;
@@ -244,8 +260,4 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
return this.channelEventListener;
}
- @Override
- public void sendResponse(RemotingChannel channel, RemotingCommand remotingCommand) {
-
- }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
index 2bd397c..89b9820 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java
@@ -350,27 +350,4 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
public void push(String addr, String sessionId, RemotingCommand remotingCommand) {
}
-
- @Override
- public void sendResponse(RemotingChannel remotingChannel, RemotingCommand response) {
- NettyChannelHandlerContextImpl channelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
- ChannelHandlerContext ctx = channelHandlerContext.getChannelHandlerContext();
- if (response != null) {
- response.markResponseType();
- try {
- ctx.writeAndFlush(response).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- log.error("processRequestWrapper response to {} failed",
- future.channel().remoteAddress(), future.cause());
- }
- }
- });
- } catch (Throwable e) {
- log.error("processRequestWrapper process request over, but response failed", e);
- log.error(response.toString());
- }
- }
- }
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
index 8ccb34e..f745ede 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
@@ -102,6 +102,10 @@ public class SnodeStartup {
in.close();
}
}
+ if (null == snodeConfig.getRocketmqHome()) {
+ System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
+ System.exit(-2);
+ }
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
@@ -135,7 +139,7 @@ public class SnodeStartup {
}
private static Options buildCommandlineOptions(final Options options) {
- Option opt = new Option("c", "configFile", true, "Broker config properties file");
+ Option opt = new Option("c", "configFile", true, "SNode config properties file");
opt.setRequired(false);
options.addOption(opt);
return options;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
index bc58a5c..edb4b58 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.processor;
-import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -34,13 +33,12 @@ import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHe
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
+import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.RequestProcessor;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
@@ -57,23 +55,20 @@ public class ConsumerManageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws InterruptedException, RemotingTimeoutException,
- RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
- NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
- ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
-
+ RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
- return this.getConsumerListByGroup(ctx, request);
+ return this.getConsumerListByGroup(remotingChannel, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
- return this.updateConsumerOffset(ctx, request);
+ return this.updateConsumerOffset(remotingChannel, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
- return this.queryConsumerOffset(ctx, request);
+ return this.queryConsumerOffset(remotingChannel, request);
case RequestCode.SEARCH_OFFSET_BY_TIMESTAMP:
- return searchOffsetByTimestamp(ctx, request);
+ return searchOffsetByTimestamp(remotingChannel, request);
case RequestCode.GET_MAX_OFFSET:
- return getMaxOffset(ctx, request);
+ return getMaxOffset(remotingChannel, request);
case RequestCode.GET_MIN_OFFSET:
- return getMinOffset(ctx, request);
+ return getMinOffset(remotingChannel, request);
default:
break;
}
@@ -85,7 +80,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
return false;
}
- public RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
+ public RemotingCommand searchOffsetByTimestamp(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader) request
@@ -98,7 +93,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
return null;
}
- public RemotingCommand getMinOffset(ChannelHandlerContext ctx,
+ public RemotingCommand getMinOffset(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
final GetMinOffsetRequestHeader requestHeader =
(GetMinOffsetRequestHeader) request
@@ -111,7 +106,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
return null;
}
- public RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
+ public RemotingCommand getMaxOffset(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
final GetMaxOffsetRequestHeader requestHeader =
(GetMaxOffsetRequestHeader) request
@@ -124,7 +119,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
return null;
}
- public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
+ public RemotingCommand getConsumerListByGroup(RemotingChannel remotingChannel, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
@@ -146,11 +141,11 @@ public class ConsumerManageProcessor implements RequestProcessor {
return response;
} else {
log.warn("GetAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
}
} else {
log.warn("GetConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()));
}
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -158,21 +153,21 @@ public class ConsumerManageProcessor implements RequestProcessor {
return response;
}
- private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+ private RemotingCommand updateConsumerOffset(RemotingChannel remotingChannel, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
final UpdateConsumerOffsetRequestHeader requestHeader =
(UpdateConsumerOffsetRequestHeader) request
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
- this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
+ this.snodeController.getConsumerOffsetManager().commitOffset(requestHeader.getEnodeName(), RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress()), requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
- private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
+ private RemotingCommand queryConsumerOffset(RemotingChannel remotingChannel, RemotingCommand request)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, RemotingCommandException {
final RemotingCommand response =
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
index c69ba5d..829ad0e 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HearbeatProcessor.java
@@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.snode.processor;
-import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -29,11 +28,8 @@ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
-import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.RequestProcessor;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ClientChannelInfo;
@@ -49,23 +45,21 @@ public class HearbeatProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws Exception {
- NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl)remotingChannel;
- ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
- return heartbeat(ctx, request);
+ return heartbeat(remotingChannel, request);
case RequestCode.UNREGISTER_CLIENT:
- return unregister(ctx, request);
+ return unregister(remotingChannel, request);
default:
break;
}
return null;
}
- private RemotingCommand heartbeat(ChannelHandlerContext ctx, RemotingCommand request) {
+ private RemotingCommand heartbeat(RemotingChannel remotingChannel, RemotingCommand request) {
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- new NettyChannelImpl(ctx.channel()),
+ remotingChannel,
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
@@ -100,7 +94,7 @@ public class HearbeatProcessor implements RequestProcessor {
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
- RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ RemotingHelper.parseChannelRemoteAddr(remotingChannel.remoteAddress())
);
}
}
@@ -111,14 +105,14 @@ public class HearbeatProcessor implements RequestProcessor {
return response;
}
- private RemotingCommand unregister(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
+ private RemotingCommand unregister(RemotingChannel remotingChannel, RemotingCommand request) throws Exception {
final RemotingCommand response =
RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
final UnregisterClientRequestHeader requestHeader =
(UnregisterClientRequestHeader) request.decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- new NettyChannelImpl(ctx.channel()),
+ remotingChannel,
requestHeader.getClientID(),
request.getLanguage(),
request.getVersion());
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
index 951d175..645d7fc 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
@@ -15,7 +15,6 @@ package org.apache.rocketmq.snode.processor;/*
* limitations under the License.
*/
-import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.help.FAQUrl;
@@ -30,10 +29,6 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.client.ConsumerGroupInfo;
@@ -50,9 +45,6 @@ public class PullMessageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
- NettyChannelHandlerContextImpl nettyChannelHandlerContext = (NettyChannelHandlerContextImpl) remotingChannel;
- ChannelHandlerContext ctx = nettyChannelHandlerContext.getChannelHandlerContext();
-
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageRequestHeader requestHeader =
@@ -97,10 +89,10 @@ public class PullMessageProcessor implements RequestProcessor {
return response;
}
- CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(ctx, request);
+ CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().pullMessage(request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
- this.snodeController.getSnodeServer().sendResponse(remotingChannel, data);
+ remotingChannel.reply(data);
} else {
log.error("Pull message error: {}", ex);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
index 6641b2a..bd18339 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
@@ -15,7 +15,6 @@ package org.apache.rocketmq.snode.processor;/*
* limitations under the License.
*/
-import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -39,7 +38,7 @@ public class SendMessageProcessor implements RequestProcessor {
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(request);
responseFuture.whenComplete((data, ex) -> {
if (ex == null) {
- snodeController.getSnodeServer().sendResponse(remotingChannel, data);
+ remotingChannel.reply(data);
} else {
log.error("Send Message error: {}", ex);
}
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
index add1cde..4c3f894 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
@@ -15,7 +15,6 @@ package org.apache.rocketmq.snode.service;/*
* limitations under the License.
*/
-import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
@@ -32,8 +31,7 @@ public interface EnodeService {
CompletableFuture<RemotingCommand> sendMessage(final RemotingCommand request);
- CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
- final RemotingCommand remotingCommand);
+ CompletableFuture<RemotingCommand> pullMessage(final RemotingCommand request);
void notifyConsumerIdsChanged(final RemotingChannel channel, final String consumerGroup);
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
index cd4a519..20a4d2f 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.serialize.RemotingSerializable;
@@ -81,8 +82,7 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
- public CompletableFuture<RemotingCommand> pullMessage(final ChannelHandlerContext context,
- RemotingCommand request) {
+ public CompletableFuture<RemotingCommand> pullMessage(RemotingCommand request) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
@@ -106,7 +106,7 @@ public class EnodeServiceImpl implements EnodeService {
}
});
} catch (Exception ex) {
- log.error("pull message async error:", ex);
+ log.error("Pull message async error:", ex);
future.completeExceptionally(ex);
}
return future;
diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
index fe28571..b272c31 100644
--- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
+++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java
@@ -19,7 +19,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -39,7 +38,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.constant.SnodeConstant;
-import org.apache.rocketmq.snode.exception.SnodeException;
import org.apache.rocketmq.snode.service.NnodeService;
public class NnodeServiceImpl implements NnodeService {