You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:31 UTC
[14/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java
new file mode 100644
index 0000000..b8c1bb0
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTimeoutException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingTimeoutException extends RemotingException {
+
+ private static final long serialVersionUID = 4106899185095245979L;
+
+
+ public RemotingTimeoutException(String message) {
+ super(message);
+ }
+
+
+ public RemotingTimeoutException(String addr, long timeoutMillis) {
+ this(addr, timeoutMillis, null);
+ }
+
+
+ public RemotingTimeoutException(String addr, long timeoutMillis, Throwable cause) {
+ super("wait response on the channel <" + addr + "> timeout, " + timeoutMillis + "(ms)", cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
new file mode 100644
index 0000000..41be8b3
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/exception/RemotingTooMuchRequestException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.exception;
+
+/**
+ * @author shijia.wxr
+ */
+public class RemotingTooMuchRequestException extends RemotingException {
+ private static final long serialVersionUID = 4326919581254519654L;
+
+
+ public RemotingTooMuchRequestException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java
new file mode 100644
index 0000000..4665b28
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyClientConfig.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyClientConfig {
+ /**
+ * Worker thread number
+ */
+ private int clientWorkerThreads = 4;
+ private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
+ private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
+ private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
+ private int connectTimeoutMillis = 3000;
+ private long channelNotActiveInterval = 1000 * 60;
+
+ /**
+ * IdleStateEvent will be triggered when neither read nor write was performed for
+ * the specified period of this time. Specify {@code 0} to disable
+ */
+ private int clientChannelMaxIdleTimeSeconds = 120;
+
+ private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
+ private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
+ private boolean clientPooledByteBufAllocatorEnable = false;
+ private boolean clientCloseSocketIfTimeout = false;
+
+ public boolean isClientCloseSocketIfTimeout() {
+ return clientCloseSocketIfTimeout;
+ }
+
+ public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
+ this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;
+ }
+
+ public int getClientWorkerThreads() {
+ return clientWorkerThreads;
+ }
+
+
+ public void setClientWorkerThreads(int clientWorkerThreads) {
+ this.clientWorkerThreads = clientWorkerThreads;
+ }
+
+
+ public int getClientOnewaySemaphoreValue() {
+ return clientOnewaySemaphoreValue;
+ }
+
+
+ public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) {
+ this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue;
+ }
+
+
+ public int getConnectTimeoutMillis() {
+ return connectTimeoutMillis;
+ }
+
+
+ public void setConnectTimeoutMillis(int connectTimeoutMillis) {
+ this.connectTimeoutMillis = connectTimeoutMillis;
+ }
+
+
+ public int getClientCallbackExecutorThreads() {
+ return clientCallbackExecutorThreads;
+ }
+
+
+ public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
+ this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+ }
+
+
+ public long getChannelNotActiveInterval() {
+ return channelNotActiveInterval;
+ }
+
+
+ public void setChannelNotActiveInterval(long channelNotActiveInterval) {
+ this.channelNotActiveInterval = channelNotActiveInterval;
+ }
+
+
+ public int getClientAsyncSemaphoreValue() {
+ return clientAsyncSemaphoreValue;
+ }
+
+
+ public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) {
+ this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue;
+ }
+
+
+ public int getClientChannelMaxIdleTimeSeconds() {
+ return clientChannelMaxIdleTimeSeconds;
+ }
+
+
+ public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) {
+ this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds;
+ }
+
+
+ public int getClientSocketSndBufSize() {
+ return clientSocketSndBufSize;
+ }
+
+
+ public void setClientSocketSndBufSize(int clientSocketSndBufSize) {
+ this.clientSocketSndBufSize = clientSocketSndBufSize;
+ }
+
+
+ public int getClientSocketRcvBufSize() {
+ return clientSocketRcvBufSize;
+ }
+
+
+ public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) {
+ this.clientSocketRcvBufSize = clientSocketRcvBufSize;
+ }
+
+
+ public boolean isClientPooledByteBufAllocatorEnable() {
+ return clientPooledByteBufAllocatorEnable;
+ }
+
+
+ public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
+ this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java
new file mode 100644
index 0000000..9e68533
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyDecoder.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyDecoder extends LengthFieldBasedFrameDecoder {
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private static final int FRAME_MAX_LENGTH = //
+ Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
+
+
+ public NettyDecoder() {
+ super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
+ }
+
+
+ @Override
+ public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
+ ByteBuf frame = null;
+ try {
+ frame = (ByteBuf) super.decode(ctx, in);
+ if (null == frame) {
+ return null;
+ }
+
+ ByteBuffer byteBuffer = frame.nioBuffer();
+
+ return RemotingCommand.decode(byteBuffer);
+ } catch (Exception e) {
+ log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
+ RemotingUtil.closeChannel(ctx.channel());
+ } finally {
+ if (null != frame) {
+ frame.release();
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java
new file mode 100644
index 0000000..c6c901c
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEncoder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ @Override
+ public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
+ throws Exception {
+ try {
+ ByteBuffer header = remotingCommand.encodeHeader();
+ out.writeBytes(header);
+ byte[] body = remotingCommand.getBody();
+ if (body != null) {
+ out.writeBytes(body);
+ }
+ } catch (Exception e) {
+ log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e);
+ if (remotingCommand != null) {
+ log.error(remotingCommand.toString());
+ }
+ RemotingUtil.closeChannel(ctx.channel());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java
new file mode 100644
index 0000000..14a2071
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEvent.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NettyEvent {
+ private final NettyEventType type;
+ private final String remoteAddr;
+ private final Channel channel;
+
+
+ public NettyEvent(NettyEventType type, String remoteAddr, Channel channel) {
+ this.type = type;
+ this.remoteAddr = remoteAddr;
+ this.channel = channel;
+ }
+
+
+ public NettyEventType getType() {
+ return type;
+ }
+
+
+ public String getRemoteAddr() {
+ return remoteAddr;
+ }
+
+
+ public Channel getChannel() {
+ return channel;
+ }
+
+
+ @Override
+ public String toString() {
+ return "NettyEvent [type=" + type + ", remoteAddr=" + remoteAddr + ", channel=" + channel + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java
new file mode 100644
index 0000000..3113147
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyEventType.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+/**
+ * @author shijia.wxr
+ *
+ */
+public enum NettyEventType {
+ CONNECT,
+ CLOSE,
+ IDLE,
+ EXCEPTION
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
new file mode 100644
index 0000000..70ae5b5
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
+import com.alibaba.rocketmq.remoting.common.ServiceThread;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public abstract class NettyRemotingAbstract {
+ private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+
+ protected final Semaphore semaphoreOneway;
+
+
+ protected final Semaphore semaphoreAsync;
+
+
+ protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
+ new ConcurrentHashMap<Integer, ResponseFuture>(256);
+
+ protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
+ new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
+ protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();
+
+ protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
+
+
+ public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
+ this.semaphoreOneway = new Semaphore(permitsOneway, true);
+ this.semaphoreAsync = new Semaphore(permitsAsync, true);
+ }
+
+ public abstract ChannelEventListener getChannelEventListener();
+
+ public void putNettyEvent(final NettyEvent event) {
+ this.nettyEventExecuter.putNettyEvent(event);
+ }
+
+ public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ final RemotingCommand cmd = msg;
+ if (cmd != null) {
+ switch (cmd.getType()) {
+ case REQUEST_COMMAND:
+ processRequestCommand(ctx, cmd);
+ break;
+ case RESPONSE_COMMAND:
+ processResponseCommand(ctx, cmd);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+
+ public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
+ final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
+ final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
+ final int opaque = cmd.getOpaque();
+
+ if (pair != null) {
+ Runnable run = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
+ if (rpcHook != null) {
+ rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
+ }
+
+ final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
+ if (rpcHook != null) {
+ rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
+ }
+
+ if (!cmd.isOnewayRPC()) {
+ if (response != null) {
+ response.setOpaque(opaque);
+ response.markResponseType();
+ try {
+ ctx.writeAndFlush(response);
+ } catch (Throwable e) {
+ PLOG.error("process request over, but response failed", e);
+ PLOG.error(cmd.toString());
+ PLOG.error(response.toString());
+ }
+ } else {
+
+ }
+ }
+ } catch (Throwable e) {
+ if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
+ .equals(e.getClass().getCanonicalName())) {
+ PLOG.error("process request exception", e);
+ PLOG.error(cmd.toString());
+ }
+
+ if (!cmd.isOnewayRPC()) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
+ RemotingHelper.exceptionSimpleDesc(e));
+ response.setOpaque(opaque);
+ ctx.writeAndFlush(response);
+ }
+ }
+ }
+ };
+
+ if (pair.getObject1().rejectRequest()) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+ "[REJECTREQUEST]system busy, start flow control for a while");
+ response.setOpaque(opaque);
+ ctx.writeAndFlush(response);
+ return;
+ }
+
+ try {
+ final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
+ pair.getObject2().submit(requestTask);
+ } catch (RejectedExecutionException e) {
+ if ((System.currentTimeMillis() % 10000) == 0) {
+ PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
+ + ", too many requests and system thread pool busy, RejectedExecutionException " //
+ + pair.getObject2().toString() //
+ + " request code: " + cmd.getCode());
+ }
+
+ if (!cmd.isOnewayRPC()) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
+ "[OVERLOAD]system busy, start flow control for a while");
+ response.setOpaque(opaque);
+ ctx.writeAndFlush(response);
+ }
+ }
+ } else {
+ String error = " request type " + cmd.getCode() + " not supported";
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
+ response.setOpaque(opaque);
+ ctx.writeAndFlush(response);
+ PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
+ }
+ }
+
+ public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
+ final int opaque = cmd.getOpaque();
+ final ResponseFuture responseFuture = responseTable.get(opaque);
+ if (responseFuture != null) {
+ responseFuture.setResponseCommand(cmd);
+
+ responseFuture.release();
+
+ responseTable.remove(opaque);
+
+ if (responseFuture.getInvokeCallback() != null) {
+ boolean runInThisThread = false;
+ ExecutorService executor = this.getCallbackExecutor();
+ if (executor != null) {
+ try {
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("execute callback in executor exception, and callback throw", e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ runInThisThread = true;
+ PLOG.warn("execute callback in executor exception, maybe executor busy", e);
+ }
+ } else {
+ runInThisThread = true;
+ }
+
+ if (runInThisThread) {
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("executeInvokeCallback Exception", e);
+ }
+ }
+ } else {
+ responseFuture.putResponse(cmd);
+ }
+ } else {
+ PLOG.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ PLOG.warn(cmd.toString());
+ }
+ }
+
+ public abstract RPCHook getRPCHook();
+
+ abstract public ExecutorService getCallbackExecutor();
+
+ public void scanResponseTable() {
+ final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
+ Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Integer, ResponseFuture> next = it.next();
+ ResponseFuture rep = next.getValue();
+
+ if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
+ rep.release();
+ it.remove();
+ rfList.add(rep);
+ PLOG.warn("remove timeout request, " + rep);
+ }
+ }
+
+ for (ResponseFuture rf : rfList) {
+ try {
+ rf.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("scanResponseTable, operationComplete Exception", e);
+ }
+ }
+ }
+
+ public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+ throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+ final int opaque = request.getOpaque();
+
+ try {
+ final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
+ this.responseTable.put(opaque, responseFuture);
+ final SocketAddress addr = channel.remoteAddress();
+ channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ if (f.isSuccess()) {
+ responseFuture.setSendRequestOK(true);
+ return;
+ } else {
+ responseFuture.setSendRequestOK(false);
+ }
+
+ responseTable.remove(opaque);
+ responseFuture.setCause(f.cause());
+ responseFuture.putResponse(null);
+ PLOG.warn("send a request command to channel <" + addr + "> failed.");
+ }
+ });
+
+ RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
+ if (null == responseCommand) {
+ if (responseFuture.isSendRequestOK()) {
+ throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
+ responseFuture.getCause());
+ } else {
+ throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
+ }
+ }
+
+ return responseCommand;
+ } finally {
+ this.responseTable.remove(opaque);
+ }
+ }
+
+ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+ final InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ final int opaque = request.getOpaque();
+ boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (acquired) {
+ final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
+
+ final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
+ this.responseTable.put(opaque, responseFuture);
+ try {
+ channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ if (f.isSuccess()) {
+ responseFuture.setSendRequestOK(true);
+ return;
+ } else {
+ responseFuture.setSendRequestOK(false);
+ }
+
+ responseFuture.putResponse(null);
+ responseTable.remove(opaque);
+ try {
+ responseFuture.executeInvokeCallback();
+ } catch (Throwable e) {
+ PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e);
+ } finally {
+ responseFuture.release();
+ }
+
+ PLOG.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+ }
+ });
+ } catch (Exception e) {
+ responseFuture.release();
+ PLOG.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
+ throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
+ }
+ } else {
+ String info =
+ String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+ timeoutMillis, //
+ this.semaphoreAsync.getQueueLength(), //
+ this.semaphoreAsync.availablePermits()//
+ );
+ PLOG.warn(info);
+ throw new RemotingTooMuchRequestException(info);
+ }
+ }
+
+ public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ request.markOnewayRPC();
+ boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (acquired) {
+ final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
+ try {
+ channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ once.release();
+ if (!f.isSuccess()) {
+ PLOG.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
+ }
+ }
+ });
+ } catch (Exception e) {
+ once.release();
+ PLOG.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
+ throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
+ }
+ } else {
+ if (timeoutMillis <= 0) {
+ throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
+ } else {
+ String info = String.format(
+ "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
+ timeoutMillis, //
+ this.semaphoreOneway.getQueueLength(), //
+ this.semaphoreOneway.availablePermits()//
+ );
+ PLOG.warn(info);
+ throw new RemotingTimeoutException(info);
+ }
+ }
+ }
+
+ class NettyEventExecuter extends ServiceThread {
+ private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
+ private final int maxSize = 10000;
+
+
+ public void putNettyEvent(final NettyEvent event) {
+ if (this.eventQueue.size() <= maxSize) {
+ this.eventQueue.add(event);
+ } else {
+ PLOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
+ }
+ }
+
+
+ @Override
+ public void run() {
+ PLOG.info(this.getServiceName() + " service started");
+
+ final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
+
+ while (!this.isStopped()) {
+ try {
+ NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
+ if (event != null && listener != null) {
+ switch (event.getType()) {
+ case IDLE:
+ listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
+ break;
+ case CLOSE:
+ listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
+ break;
+ case CONNECT:
+ listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
+ break;
+ case EXCEPTION:
+ listener.onChannelException(event.getRemoteAddr(), event.getChannel());
+ break;
+ default:
+ break;
+
+ }
+ }
+ } catch (Exception e) {
+ PLOG.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ PLOG.info(this.getServiceName() + " service end");
+ }
+
+
+ @Override
+ public String getServiceName() {
+ return NettyEventExecuter.class.getSimpleName();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java
new file mode 100644
index 0000000..68555c5
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -0,0 +1,682 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.RemotingClient;
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.exception.RemotingConnectException;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.SocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+
+ private final NettyClientConfig nettyClientConfig;
+ private final Bootstrap bootstrap = new Bootstrap();
+ private final EventLoopGroup eventLoopGroupWorker;
+ private final Lock lockChannelTables = new ReentrantLock();
+ private final ConcurrentHashMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();
+
+ private final Timer timer = new Timer("ClientHouseKeepingService", true);
+
+ private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
+ private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
+ private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
+ private final Lock lockNamesrvChannel = new ReentrantLock();
+
+ private final ExecutorService publicExecutor;
+ private final ChannelEventListener channelEventListener;
+ private DefaultEventExecutorGroup defaultEventExecutorGroup;
+ private RPCHook rpcHook;
+
+ public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
+ this(nettyClientConfig, null);
+ }
+
+ public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
+ final ChannelEventListener channelEventListener) {
+ super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
+ this.nettyClientConfig = nettyClientConfig;
+ this.channelEventListener = channelEventListener;
+
+ int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
+ if (publicThreadNums <= 0) {
+ publicThreadNums = 4;
+ }
+
+ this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
+ }
+ });
+
+ this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+ }
+
+ private static int initValueIndex() {
+ Random r = new Random();
+
+ return Math.abs(r.nextInt() % 999) % 999;
+ }
+
+ @Override
+ public void start() {
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
+ nettyClientConfig.getClientWorkerThreads(), //
+ new ThreadFactory() {
+
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
+ }
+ });
+
+ Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
+ .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
+ .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
+ .handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ defaultEventExecutorGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
+ new NettyConnetManageHandler(),
+ new NettyClientHandler());
+ }
+ });
+
+ this.timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ NettyRemotingClient.this.scanResponseTable();
+ } catch (Exception e) {
+ log.error("scanResponseTable exception", e);
+ }
+ }
+ }, 1000 * 3, 1000);
+
+ if (this.channelEventListener != null) {
+ this.nettyEventExecuter.start();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ this.timer.cancel();
+
+ for (ChannelWrapper cw : this.channelTables.values()) {
+ this.closeChannel(null, cw.getChannel());
+ }
+
+ this.channelTables.clear();
+
+ this.eventLoopGroupWorker.shutdownGracefully();
+
+ if (this.nettyEventExecuter != null) {
+ this.nettyEventExecuter.shutdown();
+ }
+
+ if (this.defaultEventExecutorGroup != null) {
+ this.defaultEventExecutorGroup.shutdownGracefully();
+ }
+ } catch (Exception e) {
+ log.error("NettyRemotingClient shutdown exception, ", e);
+ }
+
+ if (this.publicExecutor != null) {
+ try {
+ this.publicExecutor.shutdown();
+ } catch (Exception e) {
+ log.error("NettyRemotingServer shutdown exception, ", e);
+ }
+ }
+ }
+
+ public void closeChannel(final String addr, final Channel channel) {
+ if (null == channel)
+ return;
+
+ final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
+
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ final ChannelWrapper prevCW = this.channelTables.get(addrRemote);
+
+ log.info("closeChannel: begin close the channel[{}] Found: {}", addrRemote, prevCW != null);
+
+ if (null == prevCW) {
+ log.info("closeChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ removeItemFromTable = false;
+ } else if (prevCW.getChannel() != channel) {
+ log.info("closeChannel: the channel[{}] has been closed before, and has been created again, nothing to do.",
+ addrRemote);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ }
+
+ RemotingUtil.closeChannel(channel);
+ } catch (Exception e) {
+ log.error("closeChannel: close the channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ log.error("closeChannel exception", e);
+ }
+ }
+
+ @Override
+ public void registerRPCHook(RPCHook rpcHook) {
+ this.rpcHook = rpcHook;
+ }
+
+ public void closeChannel(final Channel channel) {
+ if (null == channel)
+ return;
+
+ try {
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean removeItemFromTable = true;
+ ChannelWrapper prevCW = null;
+ String addrRemote = null;
+ for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) {
+ String key = entry.getKey();
+ ChannelWrapper prev = entry.getValue();
+ if (prev.getChannel() != null) {
+ if (prev.getChannel() == channel) {
+ prevCW = prev;
+ addrRemote = key;
+ break;
+ }
+ }
+ }
+
+ if (null == prevCW) {
+ log.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote);
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable) {
+ this.channelTables.remove(addrRemote);
+ log.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
+ RemotingUtil.closeChannel(channel);
+ }
+ } catch (Exception e) {
+ log.error("closeChannel: close the channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ log.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+ } catch (InterruptedException e) {
+ log.error("closeChannel exception", e);
+ }
+ }
+
+ @Override
+ public void updateNameServerAddressList(List<String> addrs) {
+ List<String> old = this.namesrvAddrList.get();
+ boolean update = false;
+
+ if (!addrs.isEmpty()) {
+ if (null == old) {
+ update = true;
+ } else if (addrs.size() != old.size()) {
+ update = true;
+ } else {
+ for (int i = 0; i < addrs.size() && !update; i++) {
+ if (!old.contains(addrs.get(i))) {
+ update = true;
+ }
+ }
+ }
+
+ if (update) {
+ Collections.shuffle(addrs);
+ this.namesrvAddrList.set(addrs);
+ }
+ }
+ }
+
+ @Override
+ public List<String> getNameServerAddressList() {
+ return this.namesrvAddrList.get();
+ }
+
+ @Override
+ public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
+ throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
+ final Channel channel = this.getAndCreateChannel(addr);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
+ if (this.rpcHook != null) {
+ this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
+ }
+ return response;
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ } catch (RemotingTimeoutException e) {
+ if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
+ this.closeChannel(addr, channel);
+ log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
+ }
+ log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ private Channel getAndCreateChannel(final String addr) throws InterruptedException {
+ if (null == addr)
+ return getAndCreateNameserverChannel();
+
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.getChannel();
+ }
+
+ return this.createChannel(addr);
+ }
+
+ private Channel getAndCreateNameserverChannel() throws InterruptedException {
+ String addr = this.namesrvAddrChoosed.get();
+ if (addr != null) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.getChannel();
+ }
+ }
+
+ final List<String> addrList = this.namesrvAddrList.get();
+ if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ addr = this.namesrvAddrChoosed.get();
+ if (addr != null) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.getChannel();
+ }
+ }
+
+ if (addrList != null && !addrList.isEmpty()) {
+ for (int i = 0; i < addrList.size(); i++) {
+ int index = this.namesrvIndex.incrementAndGet();
+ index = Math.abs(index);
+ index = index % addrList.size();
+ String newAddr = addrList.get(index);
+
+ this.namesrvAddrChoosed.set(newAddr);
+ Channel channelNew = this.createChannel(newAddr);
+ if (channelNew != null)
+ return channelNew;
+ }
+ }
+ } catch (Exception e) {
+ log.error("getAndCreateNameserverChannel: create name server channel exception", e);
+ } finally {
+ this.lockNamesrvChannel.unlock();
+ }
+ } else {
+ log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+
+ return null;
+ }
+
+ private Channel createChannel(final String addr) throws InterruptedException {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.getChannel();
+ }
+
+
+ if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ boolean createNewConnection = false;
+ cw = this.channelTables.get(addr);
+ if (cw != null) {
+
+ if (cw.isOK()) {
+ return cw.getChannel();
+ } else if (!cw.getChannelFuture().isDone()) {
+ createNewConnection = false;
+ } else {
+ this.channelTables.remove(addr);
+ createNewConnection = true;
+ }
+ } else {
+ createNewConnection = true;
+ }
+
+ if (createNewConnection) {
+ ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
+ log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
+ cw = new ChannelWrapper(channelFuture);
+ this.channelTables.put(addr, cw);
+ }
+ } catch (Exception e) {
+ log.error("createChannel: create channel exception", e);
+ } finally {
+ this.lockChannelTables.unlock();
+ }
+ } else {
+ log.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
+ }
+
+ if (cw != null) {
+ ChannelFuture channelFuture = cw.getChannelFuture();
+ if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
+ if (cw.isOK()) {
+ log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
+ return cw.getChannel();
+ } else {
+ log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause());
+ }
+ } else {
+ log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
+ channelFuture.toString());
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
+ RemotingSendRequestException {
+ final Channel channel = this.getAndCreateChannel(addr);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+ RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ final Channel channel = this.getAndCreateChannel(addr);
+ if (channel != null && channel.isActive()) {
+ try {
+ if (this.rpcHook != null) {
+ this.rpcHook.doBeforeRequest(addr, request);
+ }
+ this.invokeOnewayImpl(channel, request, timeoutMillis);
+ } catch (RemotingSendRequestException e) {
+ log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
+ this.closeChannel(addr, channel);
+ throw e;
+ }
+ } else {
+ this.closeChannel(addr, channel);
+ throw new RemotingConnectException(addr);
+ }
+ }
+
+ @Override
+ public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+ ExecutorService executorThis = executor;
+ if (null == executor) {
+ executorThis = this.publicExecutor;
+ }
+
+ Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ this.processorTable.put(requestCode, pair);
+ }
+
+ @Override
+ public boolean isChannelWriteable(String addr) {
+ ChannelWrapper cw = this.channelTables.get(addr);
+ if (cw != null && cw.isOK()) {
+ return cw.isWriteable();
+ }
+ return true;
+ }
+
+ @Override
+ public ChannelEventListener getChannelEventListener() {
+ return channelEventListener;
+ }
+
+ @Override
+ public RPCHook getRPCHook() {
+ return this.rpcHook;
+ }
+
+ @Override
+ public ExecutorService getCallbackExecutor() {
+ return this.publicExecutor;
+ }
+
+ public List<String> getNamesrvAddrList() {
+ return namesrvAddrList.get();
+ }
+
+ public RPCHook getRpcHook() {
+ return rpcHook;
+ }
+
+ static class ChannelWrapper {
+ private final ChannelFuture channelFuture;
+
+
+ public ChannelWrapper(ChannelFuture channelFuture) {
+ this.channelFuture = channelFuture;
+ }
+
+
+ public boolean isOK() {
+ return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
+ }
+
+
+ public boolean isWriteable() {
+ return this.channelFuture.channel().isWritable();
+ }
+
+
+ private Channel getChannel() {
+ return this.channelFuture.channel();
+ }
+
+
+ public ChannelFuture getChannelFuture() {
+ return channelFuture;
+ }
+ }
+
+ class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ processMessageReceived(ctx, msg);
+
+ }
+ }
+
+ class NettyConnetManageHandler extends ChannelDuplexHandler {
+ @Override
+ public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise)
+ throws Exception {
+ final String local = localAddress == null ? "UNKNOW" : localAddress.toString();
+ final String remote = remoteAddress == null ? "UNKNOW" : remoteAddress.toString();
+ log.info("NETTY CLIENT PIPELINE: CONNECT {} => {}", local, remote);
+ super.connect(ctx, remoteAddress, localAddress, promise);
+
+ if (NettyRemotingClient.this.channelEventListener != null) {
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+
+
+ @Override
+ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
+ closeChannel(ctx.channel());
+ super.disconnect(ctx, promise);
+
+ if (NettyRemotingClient.this.channelEventListener != null) {
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+ closeChannel(ctx.channel());
+ super.close(ctx, promise);
+
+ if (NettyRemotingClient.this.channelEventListener != null) {
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent evnet = (IdleStateEvent) evt;
+ if (evnet.state().equals(IdleState.ALL_IDLE)) {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
+ closeChannel(ctx.channel());
+ if (NettyRemotingClient.this.channelEventListener != null) {
+ NettyRemotingClient.this
+ .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+ }
+
+ ctx.fireUserEventTriggered(evt);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
+ log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
+ closeChannel(ctx.channel());
+ if (NettyRemotingClient.this.channelEventListener != null) {
+ NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java
new file mode 100644
index 0000000..a14947e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import com.alibaba.rocketmq.remoting.InvokeCallback;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.RemotingServer;
+import com.alibaba.rocketmq.remoting.common.Pair;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTooMuchRequestException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
+ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
+ private final ServerBootstrap serverBootstrap;
+ private final EventLoopGroup eventLoopGroupSelector;
+ private final EventLoopGroup eventLoopGroupBoss;
+ private final NettyServerConfig nettyServerConfig;
+
+ private final ExecutorService publicExecutor;
+ private final ChannelEventListener channelEventListener;
+
+ private final Timer timer = new Timer("ServerHouseKeepingService", true);
+ private DefaultEventExecutorGroup defaultEventExecutorGroup;
+
+ private RPCHook rpcHook;
+
+
+ private int port = 0;
+
+
+ public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
+ this(nettyServerConfig, null);
+ }
+
+
+ public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) {
+ super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
+ this.serverBootstrap = new ServerBootstrap();
+ this.nettyServerConfig = nettyServerConfig;
+ this.channelEventListener = channelEventListener;
+
+ int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
+ if (publicThreadNums <= 0) {
+ publicThreadNums = 4;
+ }
+
+ this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
+ }
+ });
+
+ this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
+ }
+ });
+
+ if (RemotingUtil.isLinuxPlatform() //
+ && nettyServerConfig.isUseEpollNativeSelector()) {
+ this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+ private int threadTotal = nettyServerConfig.getServerSelectorThreads();
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
+ }
+ });
+ } else {
+ this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+ private int threadTotal = nettyServerConfig.getServerSelectorThreads();
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
+ }
+ });
+ }
+ }
+
+
+ @Override
+ public void start() {
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
+ nettyServerConfig.getServerWorkerThreads(), //
+ new ThreadFactory() {
+
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
+ }
+ });
+
+ ServerBootstrap childHandler =
+ this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector).channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
+ .option(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
+ .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ defaultEventExecutorGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
+ new NettyConnetManageHandler(),
+ new NettyServerHandler());
+ }
+ });
+
+ if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
+ childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ }
+
+ try {
+ ChannelFuture sync = this.serverBootstrap.bind().sync();
+ InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
+ this.port = addr.getPort();
+ } catch (InterruptedException e1) {
+ throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
+ }
+
+ if (this.channelEventListener != null) {
+ this.nettyEventExecuter.start();
+ }
+
+ this.timer.scheduleAtFixedRate(new TimerTask() {
+
+ @Override
+ public void run() {
+ try {
+ NettyRemotingServer.this.scanResponseTable();
+ } catch (Exception e) {
+ log.error("scanResponseTable exception", e);
+ }
+ }
+ }, 1000 * 3, 1000);
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ if (this.timer != null) {
+ this.timer.cancel();
+ }
+
+ this.eventLoopGroupBoss.shutdownGracefully();
+
+ this.eventLoopGroupSelector.shutdownGracefully();
+
+ if (this.nettyEventExecuter != null) {
+ this.nettyEventExecuter.shutdown();
+ }
+
+ if (this.defaultEventExecutorGroup != null) {
+ this.defaultEventExecutorGroup.shutdownGracefully();
+ }
+ } catch (Exception e) {
+ log.error("NettyRemotingServer shutdown exception, ", e);
+ }
+
+ if (this.publicExecutor != null) {
+ try {
+ this.publicExecutor.shutdown();
+ } catch (Exception e) {
+ log.error("NettyRemotingServer shutdown exception, ", e);
+ }
+ }
+ }
+
+ @Override
+ public void registerRPCHook(RPCHook rpcHook) {
+ this.rpcHook = rpcHook;
+ }
+
+ @Override
+ public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
+ ExecutorService executorThis = executor;
+ if (null == executor) {
+ executorThis = this.publicExecutor;
+ }
+
+ Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);
+ this.processorTable.put(requestCode, pair);
+ }
+
+ @Override
+ public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
+ this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
+ }
+
+ @Override
+ public int localListenPort() {
+ return this.port;
+ }
+
+ @Override
+ public Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(int requestCode) {
+ return processorTable.get(requestCode);
+ }
+
+ @Override
+ public RemotingCommand invokeSync(final Channel channel, final RemotingCommand request, final long timeoutMillis)
+ throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
+ return this.invokeSyncImpl(channel, request, timeoutMillis);
+ }
+
+ @Override
+ public void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
+ }
+
+ @Override
+ public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException,
+ RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ this.invokeOnewayImpl(channel, request, timeoutMillis);
+ }
+
+ @Override
+ public ChannelEventListener getChannelEventListener() {
+ return channelEventListener;
+ }
+
+ @Override
+ public RPCHook getRPCHook() {
+ return this.rpcHook;
+ }
+
+ @Override
+ public ExecutorService getCallbackExecutor() {
+ return this.publicExecutor;
+ }
+
+ class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ processMessageReceived(ctx, msg);
+ }
+ }
+
+ class NettyConnetManageHandler extends ChannelDuplexHandler {
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelRegistered {}", remoteAddress);
+ super.channelRegistered(ctx);
+ }
+
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelUnregistered, the channel[{}]", remoteAddress);
+ super.channelUnregistered(ctx);
+ }
+
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelActive, the channel[{}]", remoteAddress);
+ super.channelActive(ctx);
+
+ if (NettyRemotingServer.this.channelEventListener != null) {
+ NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.info("NETTY SERVER PIPELINE: channelInactive, the channel[{}]", remoteAddress);
+ super.channelInactive(ctx);
+
+ if (NettyRemotingServer.this.channelEventListener != null) {
+ NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent evnet = (IdleStateEvent) evt;
+ if (evnet.state().equals(IdleState.ALL_IDLE)) {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
+ RemotingUtil.closeChannel(ctx.channel());
+ if (NettyRemotingServer.this.channelEventListener != null) {
+ NettyRemotingServer.this
+ .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+ }
+ }
+ }
+
+ ctx.fireUserEventTriggered(evt);
+ }
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
+ log.warn("NETTY SERVER PIPELINE: exceptionCaught {}", remoteAddress);
+ log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
+
+ if (NettyRemotingServer.this.channelEventListener != null) {
+ NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+ }
+
+ RemotingUtil.closeChannel(ctx.channel());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java
----------------------------------------------------------------------
diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java
new file mode 100644
index 0000000..dae7f9e
--- /dev/null
+++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRequestProcessor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.remoting.netty;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.ChannelHandlerContext;
+
+
+/**
+ * Common remoting command processor
+ *
+ * @author shijia.wxr
+ *
+ */
+public interface NettyRequestProcessor {
+ RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
+ throws Exception;
+ boolean rejectRequest();
+}