You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/16 02:04:18 UTC
tajo git commit: TAJO-1391: RpcConnectionPool should check reference
counter of connection before close
Repository: tajo
Updated Branches:
refs/heads/master e1e38e231 -> 0dc7d6807
TAJO-1391: RpcConnectionPool should check reference counter of connection before close
Closes #412
Signed-off-by: Jihun Kang <ji...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0dc7d680
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0dc7d680
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0dc7d680
Branch: refs/heads/master
Commit: 0dc7d68071dcf7c9d01dde8ed7598ca422e4c50c
Parents: e1e38e2
Author: navis.ryu <na...@apache.org>
Authored: Mon Mar 16 10:03:10 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Mon Mar 16 10:03:10 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../apache/tajo/client/SessionConnection.java | 2 +-
.../org/apache/tajo/rpc/AsyncRpcClient.java | 63 ++------
.../org/apache/tajo/rpc/BlockingRpcClient.java | 68 ++-------
.../org/apache/tajo/rpc/NettyClientBase.java | 148 ++++++++++++-------
.../org/apache/tajo/rpc/RpcConnectionPool.java | 112 +++++++-------
.../main/java/org/apache/tajo/rpc/RpcUtils.java | 54 +++++++
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 32 ++--
.../org/apache/tajo/rpc/TestBlockingRpc.java | 34 +++--
9 files changed, 272 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 84a7571..9d2cd14 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-1391: RpcConnectionPool should check reference counter of connection
+ before close. (Contributed by navis, Committed by jihun)
+
TAJO-1383: Improve broadcast table cache. (jinho)
TAJO-1374: Support multi-bytes delimiter for CSV file.
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index d05d3b1..d24e7b3 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -128,7 +128,7 @@ public class SessionConnection implements Closeable {
if(!closed.get()){
try {
return connPool.getConnection(serviceTracker.getClientServiceAddress(),
- TajoMasterClientProtocol.class, false).isActive();
+ TajoMasterClientProtocol.class, false).isConnected();
} catch (Throwable e) {
return false;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 5845229..1ea9fb1 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -24,6 +24,7 @@ import com.google.protobuf.*;
import io.netty.channel.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
@@ -34,62 +35,33 @@ import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
public class AsyncRpcClient extends NettyClientBase {
private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
- private final ChannelInitializer<Channel> initializer;
- private final ProxyRpcChannel rpcChannel;
-
- private final AtomicInteger sequence = new AtomicInteger(0);
private final Map<Integer, ResponseCallback> requests =
new ConcurrentHashMap<Integer, ResponseCallback>();
- private final Class<?> protocol;
private final Method stubMethod;
-
- private RpcConnectionKey key;
+ private final ProxyRpcChannel rpcChannel;
+ private final ClientChannelInboundHandler inboundHandler;
/**
* Intentionally make this method package-private, avoiding user directly
* new an instance through this constructor.
*/
- AsyncRpcClient(final Class<?> protocol,
- final InetSocketAddress addr, int retries)
- throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
-
- this.protocol = protocol;
- String serviceClassName = protocol.getName() + "$"
- + protocol.getSimpleName() + "Service";
- Class<?> serviceClass = Class.forName(serviceClassName);
- stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
-
- initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(),
- RpcResponse.getDefaultInstance());
- super.init(addr, initializer, retries);
+ AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+ throws ClassNotFoundException, NoSuchMethodException {
+ super(rpcConnectionKey, retries);
+ stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
rpcChannel = new ProxyRpcChannel();
- this.key = new RpcConnectionKey(addr, protocol, true);
- }
-
- @Override
- public RpcConnectionKey getKey() {
- return key;
+ inboundHandler = new ClientChannelInboundHandler();
+ init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
}
@Override
public <T> T getStub() {
- try {
- return (T) stubMethod.invoke(null, rpcChannel);
- } catch (Exception e) {
- throw new RemoteException(e.getMessage(), e);
- }
- }
-
- public RpcChannel getRpcChannel() {
- return this.rpcChannel;
+ return getStub(stubMethod, rpcChannel);
}
protected void sendExceptions(String message) {
@@ -113,17 +85,6 @@ public class AsyncRpcClient extends NettyClientBase {
}
private class ProxyRpcChannel implements RpcChannel {
- private final ClientChannelInboundHandler handler;
-
- public ProxyRpcChannel() {
- this.handler = getChannel().pipeline()
- .get(ClientChannelInboundHandler.class);
-
- if (handler == null) {
- throw new IllegalArgumentException("Channel does not have " +
- "proper handler");
- }
- }
public void callMethod(final MethodDescriptor method,
final RpcController controller,
@@ -135,7 +96,7 @@ public class AsyncRpcClient extends NettyClientBase {
Message rpcRequest = buildRequest(nextSeqId, method, param);
- handler.registerCallback(nextSeqId,
+ inboundHandler.registerCallback(nextSeqId,
new ResponseCallback(controller, responseType, done));
ChannelPromise channelPromise = getChannel().newPromise();
@@ -144,7 +105,7 @@ public class AsyncRpcClient extends NettyClientBase {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- handler.exceptionCaught(null, new ServiceException(future.cause()));
+ inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
}
}
});
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 4ec5718..6a90330 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -25,6 +25,7 @@ import io.netty.channel.*;
import io.netty.util.concurrent.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
@@ -35,63 +36,33 @@ import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
public class BlockingRpcClient extends NettyClientBase {
private static final Log LOG = LogFactory.getLog(RpcProtos.class);
- private final ChannelInitializer<Channel> initializer;
- private final ProxyRpcChannel rpcChannel;
-
- private final AtomicInteger sequence = new AtomicInteger(0);
private final Map<Integer, ProtoCallFuture> requests =
new ConcurrentHashMap<Integer, ProtoCallFuture>();
- private final Class<?> protocol;
private final Method stubMethod;
-
- private RpcConnectionKey key;
+ private final ProxyRpcChannel rpcChannel;
+ private final ChannelInboundHandlerAdapter inboundHandler;
/**
* Intentionally make this method package-private, avoiding user directly
* new an instance through this constructor.
*/
- BlockingRpcClient(final Class<?> protocol,
- final InetSocketAddress addr, int retries)
- throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
-
- this.protocol = protocol;
- String serviceClassName = protocol.getName() + "$"
- + protocol.getSimpleName() + "Service";
- Class<?> serviceClass = Class.forName(serviceClassName);
- stubMethod = serviceClass.getMethod("newBlockingStub",
- BlockingRpcChannel.class);
-
- initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance());
- super.init(addr, initializer, retries);
+ BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+ throws ClassNotFoundException, NoSuchMethodException {
+ super(rpcConnectionKey, retries);
+ stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
rpcChannel = new ProxyRpcChannel();
-
- this.key = new RpcConnectionKey(addr, protocol, false);
- }
-
- @Override
- public RpcConnectionKey getKey() {
- return key;
+ inboundHandler = new ClientChannelInboundHandler();
+ init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
}
@Override
public <T> T getStub() {
- try {
- return (T) stubMethod.invoke(null, rpcChannel);
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- public BlockingRpcChannel getBlockingRpcChannel() {
- return this.rpcChannel;
+ return getStub(stubMethod, rpcChannel);
}
@Override
@@ -106,19 +77,6 @@ public class BlockingRpcClient extends NettyClientBase {
private class ProxyRpcChannel implements BlockingRpcChannel {
- private final ClientChannelInboundHandler handler;
-
- public ProxyRpcChannel() {
-
- this.handler = getChannel().pipeline().
- get(ClientChannelInboundHandler.class);
-
- if (handler == null) {
- throw new IllegalArgumentException("Channel does not have " +
- "proper handler");
- }
- }
-
@Override
public Message callBlockingMethod(final MethodDescriptor method,
final RpcController controller,
@@ -139,7 +97,7 @@ public class BlockingRpcClient extends NettyClientBase {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- handler.exceptionCaught(null, new ServiceException(future.cause()));
+ inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
}
}
});
@@ -174,7 +132,7 @@ public class BlockingRpcClient extends NettyClientBase {
}
private String getErrorMessage(String message) {
- if(protocol != null && getChannel() != null) {
+ if(getChannel() != null) {
return protocol.getName() +
"(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
getChannel().remoteAddress()) + "): " + message;
@@ -184,7 +142,7 @@ public class BlockingRpcClient extends NettyClientBase {
}
private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
- if(protocol != null && getChannel() != null) {
+ if(getChannel() != null) {
return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 7b52178..7dfc5a2 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -20,9 +20,9 @@ package org.apache.tajo.rpc;
import io.netty.channel.*;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -30,77 +30,125 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
+import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class NettyClientBase implements Closeable {
- private static Log LOG = LogFactory.getLog(NettyClientBase.class);
- private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60;
+ private static final Log LOG = LogFactory.getLog(NettyClientBase.class);
+ private static final int CONNECTION_TIMEOUT = 60000; // 60 sec
private static final long PAUSE = 1000; // 1 sec
- private int numRetries;
- protected Bootstrap bootstrap;
- private ChannelFuture channelFuture;
+ private final int numRetries;
- public NettyClientBase() {
- }
+ private Bootstrap bootstrap;
+ private volatile ChannelFuture channelFuture;
- public abstract <T> T getStub();
- public abstract RpcConnectionPool.RpcConnectionKey getKey();
-
- public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer,
- int numRetries) throws ConnectTimeoutException {
+ protected final Class<?> protocol;
+ protected final AtomicInteger sequence = new AtomicInteger(0);
+
+ private final RpcConnectionKey key;
+ private final AtomicInteger counter = new AtomicInteger(0); // reference counter
+
+ public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
+ throws ClassNotFoundException, NoSuchMethodException {
+ this.key = rpcConnectionKey;
+ this.protocol = rpcConnectionKey.protocolClass;
this.numRetries = numRetries;
-
- init(addr, initializer);
}
- public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer)
- throws ConnectTimeoutException {
+ // should be called from sub class
+ protected void init(ChannelInitializer<Channel> initializer) {
this.bootstrap = new Bootstrap();
this.bootstrap
.channel(NioSocketChannel.class)
.handler(initializer)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_REUSEADDR, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT)
.option(ChannelOption.SO_RCVBUF, 1048576 * 10)
.option(ChannelOption.TCP_NODELAY, true);
+ }
- connect(addr);
+ public RpcConnectionPool.RpcConnectionKey getKey() {
+ return key;
}
- private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+ protected final Class<?> getServiceClass() throws ClassNotFoundException {
+ String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
+ return Class.forName(serviceClassName);
+ }
+ @SuppressWarnings("unchecked")
+ protected final <T> T getStub(Method stubMethod, Object rpcChannel) {
+ try {
+ return (T) stubMethod.invoke(null, rpcChannel);
+ } catch (Exception e) {
+ throw new RemoteException(e.getMessage(), e);
+ }
+ }
+
+ public abstract <T> T getStub();
+
+ public boolean acquire(long timeout) {
+ if (!checkConnection(timeout)) {
+ return false;
+ }
+ counter.incrementAndGet();
+ return true;
+ }
+
+ public boolean release() {
+ return counter.decrementAndGet() == 0;
+ }
+
+ private boolean checkConnection(long timeout) {
+ if (isConnected()) {
+ return true;
+ }
+
+ InetSocketAddress addr = key.addr;
+ if (addr.isUnresolved()) {
+ addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+ }
+
+ return handleConnectionInternally(addr, timeout);
+ }
+
+ private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+ LOG.warn("Try to connect : " + address);
this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
.connect(address)
.addListener(listener);
}
-
- private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
- final CountDownLatch latch = new CountDownLatch(1);
- GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch);
- connectUsingNetty(addr, listener);
+
+ // first attendant kicks connection
+ private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>();
+
+ private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) {
+ final CountDownLatch ticket = new CountDownLatch(1);
+ final CountDownLatch granted = connect.check(ticket);
+
+ if (ticket == granted) {
+ connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
+ }
try {
- latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS);
+ granted.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
+ // ignore
}
- if (!channelFuture.isSuccess()) {
- throw new ConnectTimeoutException("Connect error to " + addr +
- " caused by " + ExceptionUtils.getMessage(channelFuture.cause()));
- }
- }
+ boolean success = channelFuture.isSuccess();
- public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
- if(addr.isUnresolved()){
- addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+ if (granted.getCount() == 0) {
+ connect.clear(granted);
}
- handleConnectionInternally(addr);
+ return success;
}
class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
@@ -142,32 +190,26 @@ public abstract class NettyClientBase implements Closeable {
}
}
- public boolean isActive() {
- return getChannel().isActive();
+ public Channel getChannel() {
+ return channelFuture == null ? null : channelFuture.channel();
}
- public InetSocketAddress getRemoteAddress() {
- if (channelFuture == null || channelFuture.channel() == null) {
- return null;
- }
- return (InetSocketAddress) channelFuture.channel().remoteAddress();
+ public boolean isConnected() {
+ Channel channel = getChannel();
+ return channel != null && channel.isOpen() && channel.isActive();
}
- public Channel getChannel() {
- return channelFuture.channel();
+ public SocketAddress getRemoteAddress() {
+ Channel channel = getChannel();
+ return channel == null ? null : channel.remoteAddress();
}
@Override
public void close() {
- if (channelFuture != null && getChannel().isActive()) {
- getChannel().close();
- }
-
- if (this.bootstrap != null) {
- InetSocketAddress address = getRemoteAddress();
- if (address != null) {
- LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort());
- }
+ Channel channel = getChannel();
+ if (channel != null && channel.isOpen()) {
+ LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+ channel.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index 43feeb1..6d1f479 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -18,13 +18,9 @@
package org.apache.tajo.rpc;
-import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import io.netty.channel.ConnectTimeoutException;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.logging.CommonsLoggerFactory;
import io.netty.util.internal.logging.InternalLoggerFactory;
@@ -37,7 +33,6 @@ public class RpcConnectionPool {
private Map<RpcConnectionKey, NettyClientBase> connections =
new HashMap<RpcConnectionKey, NettyClientBase>();
- private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static RpcConnectionPool instance;
private final Object lockObject = new Object();
@@ -59,103 +54,101 @@ public class RpcConnectionPool {
throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
NettyClientBase client;
if(rpcConnectionKey.asyncMode) {
- client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr,
- RPC_RETRIES);
+ client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES);
} else {
- client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr,
- RPC_RETRIES);
+ client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES);
}
- accepted.add(client.getChannel());
return client;
}
+ public static final long DEFAULT_TIMEOUT = 3000;
+ public static final long DEFAULT_INTERVAL = 500;
+
public NettyClientBase getConnection(InetSocketAddress addr,
Class<?> protocolClass, boolean asyncMode)
throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+ return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL);
+ }
+
+ public NettyClientBase getConnection(InetSocketAddress addr,
+ Class<?> protocolClass, boolean asyncMode, long timeout, long interval)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
- NettyClientBase client = connections.get(key);
- if (client == null) {
- synchronized (lockObject){
+ RpcUtils.Timer timer = new RpcUtils.Timer(timeout);
+ for (; !timer.isTimedOut(); timer.elapsed()) {
+ NettyClientBase client;
+ synchronized (lockObject) {
client = connections.get(key);
if (client == null) {
- client = makeConnection(key);
- connections.put(key, client);
+ connections.put(key, client = makeConnection(key));
}
}
+ if (client.acquire(timer.remaining())) {
+ return client;
+ }
+ timer.interval(interval);
}
- if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) {
- LOG.warn("Try to reconnect : " + addr);
- client.connect(addr);
- }
- return client;
+ throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec");
}
public void releaseConnection(NettyClientBase client) {
- if (client == null) return;
-
- try {
- synchronized (lockObject) {
- if (!client.getChannel().isOpen()) {
- connections.remove(client.getKey());
- client.close();
- }
- }
-
- if(LOG.isDebugEnabled()) {
- LOG.debug("Current Connections [" + connections.size() + "] Accepted: " + accepted.size());
-
- }
- } catch (Exception e) {
- LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
- }
+ release(client, false);
}
public void closeConnection(NettyClientBase client) {
+ release(client, true);
+ }
+
+ private void release(NettyClientBase client, boolean close) {
if (client == null) {
return;
}
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Close connection [" + client.getKey() + "]");
+ }
try {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Close connection [" + client.getKey() + "]");
- }
-
- synchronized (lockObject) {
- connections.remove(client.getKey());
+ if (returnToPool(client, close)) {
client.close();
}
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Current Connections [" + connections.size() + "]");
+ }
} catch (Exception e) {
LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
}
}
- public synchronized void close() {
+ // return true if the connection should be closed
+ private boolean returnToPool(NettyClientBase client, boolean close) {
+ synchronized (lockObject) {
+ if (client.release() && (close || !client.isConnected())) {
+ connections.remove(client.getKey());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void close() {
if(LOG.isDebugEnabled()) {
LOG.debug("Pool Closed");
}
- synchronized(lockObject) {
- for(NettyClientBase eachClient: connections.values()) {
+
+ synchronized (lockObject) {
+ for (NettyClientBase eachClient : connections.values()) {
try {
eachClient.close();
} catch (Exception e) {
LOG.error("close client pool error", e);
}
}
-
connections.clear();
}
-
- try {
- accepted.close();
- } catch (Throwable t) {
- LOG.error(t, t);
- }
}
- public synchronized void shutdown(){
+ public void shutdown(){
close();
RpcChannelFactory.shutdownGracefully();
}
@@ -165,16 +158,19 @@ public class RpcConnectionPool {
final Class<?> protocolClass;
final boolean asyncMode;
+ final String description;
+
public RpcConnectionKey(InetSocketAddress addr,
Class<?> protocolClass, boolean asyncMode) {
this.addr = addr;
this.protocolClass = protocolClass;
this.asyncMode = asyncMode;
+ this.description = "["+ protocolClass + "] " + addr + "," + asyncMode;
}
@Override
public String toString() {
- return "["+ protocolClass + "] " + addr + "," + asyncMode;
+ return description;
}
@Override
@@ -188,7 +184,7 @@ public class RpcConnectionPool {
@Override
public int hashCode() {
- return Objects.hashCode(addr, asyncMode);
+ return description.hashCode();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
index b6be05f..152d426 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -21,6 +21,7 @@ package org.apache.tajo.rpc;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicReference;
public class RpcUtils {
@@ -65,4 +66,57 @@ public class RpcUtils {
String [] splitted = addr.split(":");
return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
}
+
+ public static class Timer {
+ private long remaining;
+ private long prev;
+ public Timer(long timeout) {
+ this.remaining = timeout;
+ this.prev = System.currentTimeMillis();
+ }
+
+ public boolean isTimedOut() {
+ return remaining <= 0;
+ }
+
+ public void elapsed() {
+ long current = System.currentTimeMillis();
+ remaining -= (prev - current);
+ prev = current;
+ }
+
+ public void interval(long wait) {
+ if (wait <= 0 || isTimedOut()) {
+ return;
+ }
+ try {
+ Thread.sleep(Math.min(remaining, wait));
+ } catch (Exception ex) {
+ // ignore
+ }
+ }
+
+ public long remaining() {
+ return remaining;
+ }
+ }
+
+ public static class Scrutineer<T> {
+
+ private final AtomicReference<T> reference = new AtomicReference<T>();
+
+ T check(T ticket) {
+ T granted = reference.get();
+ for (;granted == null; granted = reference.get()) {
+ if (reference.compareAndSet(null, ticket)) {
+ return ticket;
+ }
+ }
+ return granted;
+ }
+
+ boolean clear(T granted) {
+ return reference.compareAndSet(granted, null);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 31d5265..a974a65 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -34,8 +34,6 @@ import org.junit.rules.ExternalResource;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
-import io.netty.channel.ConnectTimeoutException;
-
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@@ -125,8 +123,12 @@ public class TestAsyncRpc {
public void setUpRpcClient() throws Exception {
retries = 1;
- client = new AsyncRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(server.getListenAddress()), retries);
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(
+ RpcUtils.getConnectAddress(server.getListenAddress()),
+ DummyProtocol.class, true);
+ client = new AsyncRpcClient(rpcConnectionKey, retries);
+ client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT);
stub = client.getStub();
}
@@ -296,7 +298,10 @@ public class TestAsyncRpc {
});
serverThread.start();
- client = new AsyncRpcClient(DummyProtocol.class, address, retries);
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
+ client = new AsyncRpcClient(rpcConnectionKey, retries);
+ assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
stub = client.getStub();
stub.echo(future.getController(), echoMessage, future);
@@ -308,24 +313,25 @@ public class TestAsyncRpc {
@Test
public void testConnectionFailure() throws Exception {
InetSocketAddress address = new InetSocketAddress("test", 0);
- boolean expected = false;
try {
- new AsyncRpcClient(DummyProtocol.class, address, retries);
- fail();
- } catch (ConnectTimeoutException e) {
- expected = true;
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
+ NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
+ assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
} catch (Throwable throwable) {
fail();
}
- assertTrue(expected);
}
@Test
@SetupRpcConnection(setupRpcClient=false)
public void testUnresolvedAddress() throws Exception {
String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
- client = new AsyncRpcClient(DummyProtocol.class,
- RpcUtils.createUnresolved(hostAndPort), retries);
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(
+ RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
+ client = new AsyncRpcClient(rpcConnectionKey, retries);
+ assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
Interface stub = client.getStub();
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 07e2dca..10dd766 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -35,7 +35,6 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
@@ -116,8 +115,12 @@ public class TestBlockingRpc {
public void setUpRpcClient() throws Exception {
retries = 1;
- client = new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(server.getListenAddress()), retries);
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(
+ RpcUtils.getConnectAddress(server.getListenAddress()),
+ DummyProtocol.class, false);
+ client = new BlockingRpcClient(rpcConnectionKey, retries);
+ assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
stub = client.getStub();
}
@@ -238,7 +241,10 @@ public class TestBlockingRpc {
});
serverThread.start();
- client = new BlockingRpcClient(DummyProtocol.class, address, retries);
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false);
+ client = new BlockingRpcClient(rpcConnectionKey, retries);
+ assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
stub = client.getStub();
EchoMessage response = stub.echo(null, message);
@@ -247,24 +253,23 @@ public class TestBlockingRpc {
@Test
public void testConnectionFailed() throws Exception {
- boolean expected = false;
NettyClientBase client = null;
try {
int port = server.getListenAddress().getPort() + 1;
- client = new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries);
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(
+ RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)),
+ DummyProtocol.class, false);
+ client = new BlockingRpcClient(rpcConnectionKey, retries);
+ assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
client.close();
- fail("Connection should be failed.");
- } catch (ConnectException ce) {
- expected = true;
} catch (Throwable ce){
if (client != null) {
client.close();
}
fail();
}
- assertTrue(expected);
}
@Test
@@ -329,8 +334,11 @@ public class TestBlockingRpc {
@SetupRpcConnection(setupRpcClient=false)
public void testUnresolvedAddress() throws Exception {
String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
- client = new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.createUnresolved(hostAndPort), retries);
+ RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+ new RpcConnectionPool.RpcConnectionKey(
+ RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
+ client = new BlockingRpcClient(rpcConnectionKey, retries);
+ assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
BlockingInterface stub = client.getStub();
EchoMessage message = EchoMessage.newBuilder()