You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:28 UTC

[rocketmq] 04/14: Polish async invoke implementation, prevent the block occurred during create channel

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit e42970b090d37cce820f747d2f3dd77787f35130
Author: duhengforever <du...@gmail.com>
AuthorDate: Thu Dec 13 17:25:13 2018 +0800

    Polish async invoke implementation, prevent the block occurred during create channel
---
 .../rocketmq/remoting/RemotingClientFactory.java   |  2 -
 .../rocketmq/remoting/RemotingServerFactory.java   |  2 -
 .../rocketmq/remoting/common/RemotingUtil.java     |  2 +-
 .../rocketmq/remoting/netty/CodecHelper.java       |  1 -
 .../remoting/netty/NettyRemotingAbstract.java      | 84 +++++++++++++++-------
 .../rocketmq/remoting/netty/ResponseFuture.java    |  6 +-
 .../transport/NettyRemotingClientAbstract.java     | 12 ++++
 .../transport/NettyRemotingServerAbstract.java     |  5 ++
 .../org.apache.rocketmq.remoting.RemotingServer    |  2 +-
 9 files changed, 82 insertions(+), 34 deletions(-)

diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index a766625..825d96b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -18,9 +18,7 @@ public class RemotingClientFactory {
     private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
 
     static {
-        log.info("begin load client");
         paths = ServiceProvider.loadPath(CLIENT_LOCATION);
-        log.info("end load client, size:{}", paths.size());
     }
 
     public static RemotingClient createInstance(String protocol) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index 125d4e0..e530d7a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -19,9 +19,7 @@ public class RemotingServerFactory {
     private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
 
     static {
-        log.info("begin load server");
         protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
-        log.info("end load server, size:{}", protocolPathMap.size());
     }
 
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 88008ab..7cc8915 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -41,7 +41,7 @@ public class RemotingUtil {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
     private static boolean isLinuxPlatform = false;
     private static boolean isWindowsPlatform = false;
-    public static final String DEFAULT_PROTOCOL = "http2";
+    public static final String DEFAULT_PROTOCOL = "rocketmq";
     public static final String REMOTING_CHARSET = "UTF-8";
 
     static {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
index d0e3632..193cd39 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
@@ -45,7 +45,6 @@ public class CodecHelper {
         byte[] headerData = new byte[headerLength];
         byteBuffer.get(headerData);
         RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
-        System.out.println("cmd: " + cmd);
         int bodyLength = length - 4 - headerLength;
         byte[] bodyData = null;
         if (bodyLength > 0) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 17053ff..cae2bf4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -100,6 +100,11 @@ public abstract class NettyRemotingAbstract {
     protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
 
     /**
+     * Used for async execute task for aysncInvokeMethod
+     */
+    private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false);
+
+    /**
      * SSL context via which to create {@link SslHandler}.
      */
     protected volatile SslContext sslContext;
@@ -445,38 +450,66 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
-    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+    abstract protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException;
+
+    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,
+        final long timeoutMillis,
         final InvokeCallback invokeCallback)
         throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
-        long beginStartTime = System.currentTimeMillis();
-        final int opaque = request.getOpaque();
-        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+        invokeAsyncImpl(null, channel, request, timeoutMillis, invokeCallback);
+    }
+
+    public void invokeAsyncImpl(final String addr, final RemotingCommand request,
+        final long timeoutMillis,
+        final InvokeCallback invokeCallback)
+        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        invokeAsyncImpl(addr, null, request, timeoutMillis, invokeCallback);
+    }
+
+    public void invokeAsyncImpl(final String addr, final Channel currentChannel, final RemotingCommand request,
+        final long timeoutMillis,
+        final InvokeCallback invokeCallback)
+        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        final long beginStartTime = System.currentTimeMillis();
+        boolean acquired = semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
         if (acquired) {
-            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
+            SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(semaphoreAsync);
             long costTime = System.currentTimeMillis() - beginStartTime;
             if (timeoutMillis < costTime) {
-                throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
+                once.release();
+                throw new RemotingTimeoutException("InvokeAsyncImpl call timeout");
             }
-
-            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
-            this.responseTable.put(opaque, responseFuture);
-            try {
-                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture f) throws Exception {
-                        if (f.isSuccess()) {
-                            responseFuture.setSendRequestOK(true);
-                            return;
+            final int opaque = request.getOpaque();
+            final ResponseFuture responseFuture = new ResponseFuture(currentChannel, opaque, timeoutMillis, invokeCallback, once);
+            responseTable.put(opaque, responseFuture);
+            asyncExecuteService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    Channel channel = currentChannel;
+                    final String remotingAddr = RemotingHelper.parseChannelRemoteAddr(channel);
+                    try {
+                        if (channel == null) {
+                            channel = getAndCreateChannel(addr, timeoutMillis);
+                            responseFuture.setProcessChannel(channel);
                         }
+                        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+                            @Override
+                            public void operationComplete(ChannelFuture f) throws Exception {
+                                if (f.isSuccess()) {
+                                    responseFuture.setSendRequestOK(true);
+                                    return;
+                                }
+                                requestFail(opaque);
+                                log.warn("send a request command to channel <{}> failed.", remotingAddr);
+                            }
+                        });
+                    } catch (Exception ex) {
+                        responseFuture.release();
                         requestFail(opaque);
-                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+                        log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", ex);
                     }
-                });
-            } catch (Exception e) {
-                responseFuture.release();
-                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
-                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
-            }
+                }
+            });
         } else {
             if (timeoutMillis <= 0) {
                 throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
@@ -527,7 +560,8 @@ public abstract class NettyRemotingAbstract {
     }
 
     public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
-        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+        throws
+        InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
         request.markOnewayRPC();
         boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
         if (acquired) {
@@ -625,6 +659,4 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
-
-
 }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 5f4c8c6..bffe602 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class ResponseFuture {
     private final int opaque;
-    private final Channel processChannel;
+    private Channel processChannel;
     private final long timeoutMillis;
     private final InvokeCallback invokeCallback;
     private final long beginTimestamp = System.currentTimeMillis();
@@ -121,6 +121,10 @@ public class ResponseFuture {
         return processChannel;
     }
 
+    public void setProcessChannel(Channel processChannel) {
+        this.processChannel = processChannel;
+    }
+
     @Override
     public String toString() {
         return "ResponseFuture [responseCommand=" + responseCommand
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
index 6dab218..d53d40b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
@@ -32,6 +32,7 @@ import java.util.Random;
 import java.util.Timer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -39,11 +40,17 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyEvent;
 import org.apache.rocketmq.remoting.netty.NettyEventType;
 import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
 
 public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -53,6 +60,10 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
     private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
     private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
     private final Lock lockNamesrvChannel = new ReentrantLock();
+    /**
+     * Used for async execute task for aysncInvokeMethod
+     */
+    private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false);
 
     private final Lock lockChannelTables = new ReentrantLock();
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
@@ -169,6 +180,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
         }
     }
 
+    @Override
     protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
         if (null == addr) {
             return getAndCreateNameserverChannel(timeout);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
index e8d8471..cec0086 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
@@ -1,5 +1,6 @@
 package org.apache.rocketmq.remoting.transport;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.timeout.IdleState;
@@ -90,4 +91,8 @@ public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract
             RemotingUtil.closeChannel(ctx.channel());
         }
     }
+
+    @Override protected Channel getAndCreateChannel(String addr, long timeout) throws InterruptedException {
+        return null;
+    }
 }
diff --git a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
index 5079c88..9f70dce 100644
--- a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
+++ b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
@@ -1,2 +1,2 @@
 rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
-http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
\ No newline at end of file
+http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl