You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by do...@apache.org on 2017/06/08 07:54:34 UTC

[13/50] [abbrv] incubator-rocketmq git commit: Add javadoc to NettyRemotingAbstract class and several other trivial clean up.

Add javadoc to NettyRemotingAbstract class and several other trivial clean up.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6609c866
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6609c866
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6609c866

Branch: refs/heads/release-4.1.0-incubating
Commit: 6609c86650917ebfb5bd12a4bd8b1bcf9c477759
Parents: 6a9628b
Author: Zhanhui Li <li...@apache.org>
Authored: Tue Apr 25 22:58:14 2017 +0800
Committer: Zhanhui Li <li...@apache.org>
Committed: Tue Apr 25 22:58:14 2017 +0800

----------------------------------------------------------------------
 .../remoting/netty/NettyRemotingAbstract.java   | 93 ++++++++++++++++++--
 .../remoting/netty/NettyRemotingClient.java     | 19 ++--
 .../remoting/netty/NettyRemotingServer.java     | 22 ++---
 3 files changed, 106 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6609c866/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 83eeb02..cddab3d 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -48,32 +48,84 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public abstract class NettyRemotingAbstract {
+
+    /**
+     * Remoting logger instance.
+     */
     private static final Logger PLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
 
+    /**
+     * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.
+     */
     protected final Semaphore semaphoreOneway;
 
+    /**
+     * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.
+     */
     protected final Semaphore semaphoreAsync;
 
+    /**
+     * This map caches all on-going requests.
+     */
     protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable =
         new ConcurrentHashMap<Integer, ResponseFuture>(256);
 
+    /**
+     * This container holds all processors per request code, aka, for each incoming request, we may look up the
+     * responding processor in this map to handle the request.
+     */
     protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
         new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
-    protected final NettyEventExecuter nettyEventExecuter = new NettyEventExecuter();
 
+    /**
+     * Executor to feed netty events to user defined {@link ChannelEventListener}.
+     */
+    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
+
+    /**
+     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.
+     */
     protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
 
+    /**
+     * Constructor, specifying capacity of one-way and asynchronous semaphores.
+     * @param permitsOneway Number of permits for one-way requests.
+     * @param permitsAsync Number of permits for asynchronous requests.
+     */
     public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {
         this.semaphoreOneway = new Semaphore(permitsOneway, true);
         this.semaphoreAsync = new Semaphore(permitsAsync, true);
     }
 
+    /**
+     * Custom channel event listener.
+     * @return custom channel event listener if defined; null otherwise.
+     */
     public abstract ChannelEventListener getChannelEventListener();
 
+    /**
+     * Put a netty event to the executor.
+     * @param event Netty event instance.
+     */
     public void putNettyEvent(final NettyEvent event) {
-        this.nettyEventExecuter.putNettyEvent(event);
+        this.nettyEventExecutor.putNettyEvent(event);
     }
 
+    /**
+     * Entry of incoming command processing.
+     *
+     * <p>
+     *     <strong>Note:</strong>
+     *     The incoming remoting command may be
+     *     <ul>
+     *         <li>An inquiry request from a remote peer component;</li>
+     *         <li>A response to a previous request issued by this very participant.</li>
+     *     </ul>
+     * </p>
+     * @param ctx Channel handler context.
+     * @param msg incoming remoting command.
+     * @throws Exception if there were any error while processing the incoming command.
+     */
     public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
         final RemotingCommand cmd = msg;
         if (cmd != null) {
@@ -90,6 +142,11 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    /**
+     * Process incoming request command issued by remote peer.
+     * @param ctx channel handler context.
+     * @param cmd request command.
+     */
     public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
         final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
         final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
@@ -175,6 +232,11 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    /**
+     * Process response from remote peer to the previous issued requests.
+     * @param ctx channel handler context.
+     * @param cmd response command instance.
+     */
     public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
         final int opaque = cmd.getOpaque();
         final ResponseFuture responseFuture = responseTable.get(opaque);
@@ -196,7 +258,10 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
-    //execute callback in callback executor. If callback executor is null, run directly in current thread
+    /**
+     * Execute callback in callback executor. If callback executor is null, run directly in current thread
+     * @param responseFuture
+     */
     private void executeInvokeCallback(final ResponseFuture responseFuture) {
         boolean runInThisThread = false;
         ExecutorService executor = this.getCallbackExecutor();
@@ -229,10 +294,24 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    /**
+     * Custom RPC hook.
+     * @return RPC hook if specified; null otherwise.
+     */
     public abstract RPCHook getRPCHook();
 
-    abstract public ExecutorService getCallbackExecutor();
-
+    /**
+     * This method specifies thread pool to use while invoking callback methods.
+     * @return Dedicated thread pool instance if specified; or null if the callback is supposed to be executed in the
+     * netty event-loop thread.
+     */
+    public abstract ExecutorService getCallbackExecutor();
+
+    /**
+     * <p>
+     *    This method is periodically invoked to scan and expire deprecated request.
+     * </p>
+     */
     public void scanResponseTable() {
         final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
         Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
@@ -386,7 +465,7 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
-    class NettyEventExecuter extends ServiceThread {
+    class NettyEventExecutor extends ServiceThread {
         private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<NettyEvent>();
         private final int maxSize = 10000;
 
@@ -436,7 +515,7 @@ public abstract class NettyRemotingAbstract {
 
         @Override
         public String getServiceName() {
-            return NettyEventExecuter.class.getSimpleName();
+            return NettyEventExecutor.class.getSimpleName();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6609c866/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 26088aa..52ca47e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -172,7 +172,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         }, 1000 * 3, 1000);
 
         if (this.channelEventListener != null) {
-            this.nettyEventExecuter.start();
+            this.nettyEventExecutor.start();
         }
     }
 
@@ -189,8 +189,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
 
             this.eventLoopGroupWorker.shutdownGracefully();
 
-            if (this.nettyEventExecuter != null) {
-                this.nettyEventExecuter.shutdown();
+            if (this.nettyEventExecutor != null) {
+                this.nettyEventExecutor.shutdown();
             }
 
             if (this.defaultEventExecutorGroup != null) {
@@ -586,7 +586,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
             processMessageReceived(ctx, msg);
-
         }
     }
 
@@ -594,8 +593,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         @Override
         public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
             ChannelPromise promise) throws Exception {
-            final String local = localAddress == null ? "UNKNOWN" : localAddress.toString();
-            final String remote = remoteAddress == null ? "UNKNOWN" : remoteAddress.toString();
+            final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
+            final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
             log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
 
             super.connect(ctx, remoteAddress, localAddress, promise);
@@ -613,7 +612,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             super.disconnect(ctx, promise);
 
             if (NettyRemotingClient.this.channelEventListener != null) {
-                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
             }
         }
 
@@ -625,7 +624,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             super.close(ctx, promise);
 
             if (NettyRemotingClient.this.channelEventListener != null) {
-                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
             }
         }
 
@@ -639,7 +638,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                     closeChannel(ctx.channel());
                     if (NettyRemotingClient.this.channelEventListener != null) {
                         NettyRemotingClient.this
-                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                     }
                 }
             }
@@ -654,7 +653,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
             log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
             closeChannel(ctx.channel());
             if (NettyRemotingClient.this.channelEventListener != null) {
-                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+                NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6609c866/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 6a6df37..d8d9b65 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -160,7 +160,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
                             new NettyEncoder(),
                             new NettyDecoder(),
                             new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
-                            new NettyConnetManageHandler(),
+                            new NettyConnectManageHandler(),
                             new NettyServerHandler());
                     }
                 });
@@ -178,7 +178,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
 
         if (this.channelEventListener != null) {
-            this.nettyEventExecuter.start();
+            this.nettyEventExecutor.start();
         }
 
         this.timer.scheduleAtFixedRate(new TimerTask() {
@@ -205,8 +205,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
 
             this.eventLoopGroupSelector.shutdownGracefully();
 
-            if (this.nettyEventExecuter != null) {
-                this.nettyEventExecuter.shutdown();
+            if (this.nettyEventExecutor != null) {
+                this.nettyEventExecutor.shutdown();
             }
 
             if (this.defaultEventExecutorGroup != null) {
@@ -297,7 +297,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
         }
     }
 
-    class NettyConnetManageHandler extends ChannelDuplexHandler {
+    class NettyConnectManageHandler extends ChannelDuplexHandler {
         @Override
         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
             final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
@@ -319,7 +319,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
             super.channelActive(ctx);
 
             if (NettyRemotingServer.this.channelEventListener != null) {
-                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress.toString(), ctx.channel()));
+                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remoteAddress, ctx.channel()));
             }
         }
 
@@ -330,21 +330,21 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
             super.channelInactive(ctx);
 
             if (NettyRemotingServer.this.channelEventListener != null) {
-                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress.toString(), ctx.channel()));
+                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
             }
         }
 
         @Override
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
             if (evt instanceof IdleStateEvent) {
-                IdleStateEvent evnet = (IdleStateEvent) evt;
-                if (evnet.state().equals(IdleState.ALL_IDLE)) {
+                IdleStateEvent event = (IdleStateEvent) evt;
+                if (event.state().equals(IdleState.ALL_IDLE)) {
                     final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                     log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress);
                     RemotingUtil.closeChannel(ctx.channel());
                     if (NettyRemotingServer.this.channelEventListener != null) {
                         NettyRemotingServer.this
-                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress.toString(), ctx.channel()));
+                            .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                     }
                 }
             }
@@ -359,7 +359,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
             log.warn("NETTY SERVER PIPELINE: exceptionCaught exception.", cause);
 
             if (NettyRemotingServer.this.channelEventListener != null) {
-                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress.toString(), ctx.channel()));
+                NettyRemotingServer.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
             }
 
             RemotingUtil.closeChannel(ctx.channel());