You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2019/01/02 16:45:28 UTC
[rocketmq] 04/14: Polish async invoke implementation,
prevent the block occurred during create channel
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch snode
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit e42970b090d37cce820f747d2f3dd77787f35130
Author: duhengforever <du...@gmail.com>
AuthorDate: Thu Dec 13 17:25:13 2018 +0800
Polish async invoke implementation, prevent the block occurred during create channel
---
.../rocketmq/remoting/RemotingClientFactory.java | 2 -
.../rocketmq/remoting/RemotingServerFactory.java | 2 -
.../rocketmq/remoting/common/RemotingUtil.java | 2 +-
.../rocketmq/remoting/netty/CodecHelper.java | 1 -
.../remoting/netty/NettyRemotingAbstract.java | 84 +++++++++++++++-------
.../rocketmq/remoting/netty/ResponseFuture.java | 6 +-
.../transport/NettyRemotingClientAbstract.java | 12 ++++
.../transport/NettyRemotingServerAbstract.java | 5 ++
.../org.apache.rocketmq.remoting.RemotingServer | 2 +-
9 files changed, 82 insertions(+), 34 deletions(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
index a766625..825d96b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClientFactory.java
@@ -18,9 +18,7 @@ public class RemotingClientFactory {
private static final String CLIENT_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingClient";
static {
- log.info("begin load client");
paths = ServiceProvider.loadPath(CLIENT_LOCATION);
- log.info("end load client, size:{}", paths.size());
}
public static RemotingClient createInstance(String protocol) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
index 125d4e0..e530d7a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServerFactory.java
@@ -19,9 +19,7 @@ public class RemotingServerFactory {
private static final String SERVER_LOCATION = "META-INF/service/org.apache.rocketmq.remoting.RemotingServer";
static {
- log.info("begin load server");
protocolPathMap = ServiceProvider.loadPath(SERVER_LOCATION);
- log.info("end load server, size:{}", protocolPathMap.size());
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
index 88008ab..7cc8915 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java
@@ -41,7 +41,7 @@ public class RemotingUtil {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private static boolean isLinuxPlatform = false;
private static boolean isWindowsPlatform = false;
- public static final String DEFAULT_PROTOCOL = "http2";
+ public static final String DEFAULT_PROTOCOL = "rocketmq";
public static final String REMOTING_CHARSET = "UTF-8";
static {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
index d0e3632..193cd39 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/CodecHelper.java
@@ -45,7 +45,6 @@ public class CodecHelper {
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
- System.out.println("cmd: " + cmd);
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 17053ff..cae2bf4 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -100,6 +100,11 @@ public abstract class NettyRemotingAbstract {
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
/**
+ * Used for async execute task for aysncInvokeMethod
+ */
+ private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false);
+
+ /**
* SSL context via which to create {@link SslHandler}.
*/
protected volatile SslContext sslContext;
@@ -445,38 +450,66 @@ public abstract class NettyRemotingAbstract {
}
}
- public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
+ abstract protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException;
+
+ public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,
+ final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
- long beginStartTime = System.currentTimeMillis();
- final int opaque = request.getOpaque();
- boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
+ invokeAsyncImpl(null, channel, request, timeoutMillis, invokeCallback);
+ }
+
+ public void invokeAsyncImpl(final String addr, final RemotingCommand request,
+ final long timeoutMillis,
+ final InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ invokeAsyncImpl(addr, null, request, timeoutMillis, invokeCallback);
+ }
+
+ public void invokeAsyncImpl(final String addr, final Channel currentChannel, final RemotingCommand request,
+ final long timeoutMillis,
+ final InvokeCallback invokeCallback)
+ throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ final long beginStartTime = System.currentTimeMillis();
+ boolean acquired = semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
- final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
+ SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
- throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
+ once.release();
+ throw new RemotingTimeoutException("InvokeAsyncImpl call timeout");
}
-
- final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
- this.responseTable.put(opaque, responseFuture);
- try {
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
+ final int opaque = request.getOpaque();
+ final ResponseFuture responseFuture = new ResponseFuture(currentChannel, opaque, timeoutMillis, invokeCallback, once);
+ responseTable.put(opaque, responseFuture);
+ asyncExecuteService.submit(new Runnable() {
+ @Override
+ public void run() {
+ Channel channel = currentChannel;
+ final String remotingAddr = RemotingHelper.parseChannelRemoteAddr(channel);
+ try {
+ if (channel == null) {
+ channel = getAndCreateChannel(addr, timeoutMillis);
+ responseFuture.setProcessChannel(channel);
}
+ channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ if (f.isSuccess()) {
+ responseFuture.setSendRequestOK(true);
+ return;
+ }
+ requestFail(opaque);
+ log.warn("send a request command to channel <{}> failed.", remotingAddr);
+ }
+ });
+ } catch (Exception ex) {
+ responseFuture.release();
requestFail(opaque);
- log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
+ log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", ex);
}
- });
- } catch (Exception e) {
- responseFuture.release();
- log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
- throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
- }
+ }
+ });
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
@@ -527,7 +560,8 @@ public abstract class NettyRemotingAbstract {
}
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
+ throws
+ InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
@@ -625,6 +659,4 @@ public abstract class NettyRemotingAbstract {
}
}
-
-
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 5f4c8c6..bffe602 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class ResponseFuture {
private final int opaque;
- private final Channel processChannel;
+ private Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
@@ -121,6 +121,10 @@ public class ResponseFuture {
return processChannel;
}
+ public void setProcessChannel(Channel processChannel) {
+ this.processChannel = processChannel;
+ }
+
@Override
public String toString() {
return "ResponseFuture [responseCommand=" + responseCommand
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
index 6dab218..d53d40b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java
@@ -32,6 +32,7 @@ import java.util.Random;
import java.util.Timer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -39,11 +40,17 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.netty.NettyEvent;
import org.apache.rocketmq.remoting.netty.NettyEventType;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.util.ThreadUtils;
public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
@@ -53,6 +60,10 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
private final Lock lockNamesrvChannel = new ReentrantLock();
+ /**
+ * Used for async execute task for aysncInvokeMethod
+ */
+ private ExecutorService asyncExecuteService = ThreadUtils.newFixedThreadPool(5, 10000, "asyncExecute", false);
private final Lock lockChannelTables = new ReentrantLock();
private static final long LOCK_TIMEOUT_MILLIS = 3000;
@@ -169,6 +180,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
}
}
+ @Override
protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel(timeout);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
index e8d8471..cec0086 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java
@@ -1,5 +1,6 @@
package org.apache.rocketmq.remoting.transport;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
@@ -90,4 +91,8 @@ public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract
RemotingUtil.closeChannel(ctx.channel());
}
}
+
+ @Override protected Channel getAndCreateChannel(String addr, long timeout) throws InterruptedException {
+ return null;
+ }
}
diff --git a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
index 5079c88..9f70dce 100644
--- a/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
+++ b/remoting/src/main/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer
@@ -1,2 +1,2 @@
rocketmq=org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingServer
-http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl
\ No newline at end of file
+http2=org.apache.rocketmq.remoting.transport.http2.Http2ServerImpl