You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/16 02:04:18 UTC

tajo git commit: TAJO-1391: RpcConnectionPool should check reference counter of connection before close

Repository: tajo
Updated Branches:
  refs/heads/master e1e38e231 -> 0dc7d6807


TAJO-1391: RpcConnectionPool should check reference counter of connection before close

Closes #412

Signed-off-by: Jihun Kang <ji...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0dc7d680
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0dc7d680
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0dc7d680

Branch: refs/heads/master
Commit: 0dc7d68071dcf7c9d01dde8ed7598ca422e4c50c
Parents: e1e38e2
Author: navis.ryu <na...@apache.org>
Authored: Mon Mar 16 10:03:10 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Mon Mar 16 10:03:10 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/client/SessionConnection.java   |   2 +-
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  63 ++------
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  68 ++-------
 .../org/apache/tajo/rpc/NettyClientBase.java    | 148 ++++++++++++-------
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 112 +++++++-------
 .../main/java/org/apache/tajo/rpc/RpcUtils.java |  54 +++++++
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |  32 ++--
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  34 +++--
 9 files changed, 272 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 84a7571..9d2cd14 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1391: RpcConnectionPool should check reference counter of connection 
+    before close. (Contributed by navis, Committed by jihun)
+
     TAJO-1383: Improve broadcast table cache. (jinho)
 
     TAJO-1374: Support multi-bytes delimiter for CSV file.

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index d05d3b1..d24e7b3 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -128,7 +128,7 @@ public class SessionConnection implements Closeable {
     if(!closed.get()){
       try {
         return connPool.getConnection(serviceTracker.getClientServiceAddress(),
-            TajoMasterClientProtocol.class, false).isActive();
+            TajoMasterClientProtocol.class, false).isConnected();
       } catch (Throwable e) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 5845229..1ea9fb1 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -24,6 +24,7 @@ import com.google.protobuf.*;
 import io.netty.channel.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
@@ -34,62 +35,33 @@ import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 
 public class AsyncRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
 
-  private final ChannelInitializer<Channel> initializer;
-  private final ProxyRpcChannel rpcChannel;
-
-  private final AtomicInteger sequence = new AtomicInteger(0);
   private final Map<Integer, ResponseCallback> requests =
       new ConcurrentHashMap<Integer, ResponseCallback>();
 
-  private final Class<?> protocol;
   private final Method stubMethod;
-
-  private RpcConnectionKey key;
+  private final ProxyRpcChannel rpcChannel;
+  private final ClientChannelInboundHandler inboundHandler;
 
   /**
    * Intentionally make this method package-private, avoiding user directly
    * new an instance through this constructor.
    */
-  AsyncRpcClient(final Class<?> protocol,
-                        final InetSocketAddress addr, int retries)
-      throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
-
-    this.protocol = protocol;
-    String serviceClassName = protocol.getName() + "$"
-        + protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
-
-    initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), 
-        RpcResponse.getDefaultInstance());
-    super.init(addr, initializer, retries);
+  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+      throws ClassNotFoundException, NoSuchMethodException {
+    super(rpcConnectionKey, retries);
+    stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class);
     rpcChannel = new ProxyRpcChannel();
-    this.key = new RpcConnectionKey(addr, protocol, true);
-  }
-
-  @Override
-  public RpcConnectionKey getKey() {
-    return key;
+    inboundHandler = new ClientChannelInboundHandler();
+    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
   }
 
   @Override
   public <T> T getStub() {
-    try {
-      return (T) stubMethod.invoke(null, rpcChannel);
-    } catch (Exception e) {
-      throw new RemoteException(e.getMessage(), e);
-    }
-  }
-
-  public RpcChannel getRpcChannel() {
-    return this.rpcChannel;
+    return getStub(stubMethod, rpcChannel);
   }
 
   protected void sendExceptions(String message) {
@@ -113,17 +85,6 @@ public class AsyncRpcClient extends NettyClientBase {
   }
 
   private class ProxyRpcChannel implements RpcChannel {
-    private final ClientChannelInboundHandler handler;
-
-    public ProxyRpcChannel() {
-      this.handler = getChannel().pipeline()
-          .get(ClientChannelInboundHandler.class);
-
-      if (handler == null) {
-        throw new IllegalArgumentException("Channel does not have " +
-            "proper handler");
-      }
-    }
 
     public void callMethod(final MethodDescriptor method,
                            final RpcController controller,
@@ -135,7 +96,7 @@ public class AsyncRpcClient extends NettyClientBase {
 
       Message rpcRequest = buildRequest(nextSeqId, method, param);
 
-      handler.registerCallback(nextSeqId,
+      inboundHandler.registerCallback(nextSeqId,
           new ResponseCallback(controller, responseType, done));
 
       ChannelPromise channelPromise = getChannel().newPromise();
@@ -144,7 +105,7 @@ public class AsyncRpcClient extends NettyClientBase {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
           if (!future.isSuccess()) {
-            handler.exceptionCaught(null, new ServiceException(future.cause()));
+            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
           }
         }
       });

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 4ec5718..6a90330 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -25,6 +25,7 @@ import io.netty.channel.*;
 import io.netty.util.concurrent.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
@@ -35,63 +36,33 @@ import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.*;
 import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 
 public class BlockingRpcClient extends NettyClientBase {
   private static final Log LOG = LogFactory.getLog(RpcProtos.class);
 
-  private final ChannelInitializer<Channel> initializer;
-  private final ProxyRpcChannel rpcChannel;
-
-  private final AtomicInteger sequence = new AtomicInteger(0);
   private final Map<Integer, ProtoCallFuture> requests =
       new ConcurrentHashMap<Integer, ProtoCallFuture>();
 
-  private final Class<?> protocol;
   private final Method stubMethod;
-
-  private RpcConnectionKey key;
+  private final ProxyRpcChannel rpcChannel;
+  private final ChannelInboundHandlerAdapter inboundHandler;
 
   /**
    * Intentionally make this method package-private, avoiding user directly
    * new an instance through this constructor.
    */
-  BlockingRpcClient(final Class<?> protocol,
-                           final InetSocketAddress addr, int retries)
-      throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
-
-    this.protocol = protocol;
-    String serviceClassName = protocol.getName() + "$"
-        + protocol.getSimpleName() + "Service";
-    Class<?> serviceClass = Class.forName(serviceClassName);
-    stubMethod = serviceClass.getMethod("newBlockingStub",
-        BlockingRpcChannel.class);
-
-    initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance());
-    super.init(addr, initializer, retries);
+  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
+      throws ClassNotFoundException, NoSuchMethodException {
+    super(rpcConnectionKey, retries);
+    stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
     rpcChannel = new ProxyRpcChannel();
-
-    this.key = new RpcConnectionKey(addr, protocol, false);
-  }
-
-  @Override
-  public RpcConnectionKey getKey() {
-    return key;
+    inboundHandler = new ClientChannelInboundHandler();
+    init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance()));
   }
 
   @Override
   public <T> T getStub() {
-    try {
-      return (T) stubMethod.invoke(null, rpcChannel);
-    } catch (Exception e) {
-      throw new RuntimeException(e.getMessage(), e);
-    }
-  }
-
-  public BlockingRpcChannel getBlockingRpcChannel() {
-    return this.rpcChannel;
+    return getStub(stubMethod, rpcChannel);
   }
 
   @Override
@@ -106,19 +77,6 @@ public class BlockingRpcClient extends NettyClientBase {
 
   private class ProxyRpcChannel implements BlockingRpcChannel {
 
-    private final ClientChannelInboundHandler handler;
-
-    public ProxyRpcChannel() {
-
-      this.handler = getChannel().pipeline().
-          get(ClientChannelInboundHandler.class);
-
-      if (handler == null) {
-        throw new IllegalArgumentException("Channel does not have " +
-            "proper handler");
-      }
-    }
-
     @Override
     public Message callBlockingMethod(final MethodDescriptor method,
                                       final RpcController controller,
@@ -139,7 +97,7 @@ public class BlockingRpcClient extends NettyClientBase {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
           if (!future.isSuccess()) {
-            handler.exceptionCaught(null, new ServiceException(future.cause()));
+            inboundHandler.exceptionCaught(null, new ServiceException(future.cause()));
           }
         }
       });
@@ -174,7 +132,7 @@ public class BlockingRpcClient extends NettyClientBase {
   }
 
   private String getErrorMessage(String message) {
-    if(protocol != null && getChannel() != null) {
+    if(getChannel() != null) {
       return protocol.getName() +
           "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
           getChannel().remoteAddress()) + "): " + message;
@@ -184,7 +142,7 @@ public class BlockingRpcClient extends NettyClientBase {
   }
 
   private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
-    if(protocol != null && getChannel() != null) {
+    if(getChannel() != null) {
       return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
           RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 7b52178..7dfc5a2 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -20,9 +20,9 @@ package org.apache.tajo.rpc;
 
 import io.netty.channel.*;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -30,77 +30,125 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class NettyClientBase implements Closeable {
-  private static Log LOG = LogFactory.getLog(NettyClientBase.class);
-  private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60;
+  private static final Log LOG = LogFactory.getLog(NettyClientBase.class);
+  private static final int CONNECTION_TIMEOUT = 60000;  // 60 sec
   private static final long PAUSE = 1000; // 1 sec
-  private int numRetries;
 
-  protected Bootstrap bootstrap;
-  private ChannelFuture channelFuture;
+  private final int numRetries;
 
-  public NettyClientBase() {
-  }
+  private Bootstrap bootstrap;
+  private volatile ChannelFuture channelFuture;
 
-  public abstract <T> T getStub();
-  public abstract RpcConnectionPool.RpcConnectionKey getKey();
-  
-  public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer, 
-      int numRetries) throws ConnectTimeoutException {
+  protected final Class<?> protocol;
+  protected final AtomicInteger sequence = new AtomicInteger(0);
+
+  private final RpcConnectionKey key;
+  private final AtomicInteger counter = new AtomicInteger(0);   // reference counter
+
+  public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries)
+      throws ClassNotFoundException, NoSuchMethodException {
+    this.key = rpcConnectionKey;
+    this.protocol = rpcConnectionKey.protocolClass;
     this.numRetries = numRetries;
-    
-    init(addr, initializer);
   }
 
-  public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer)
-      throws ConnectTimeoutException {
+  // should be called from sub class
+  protected void init(ChannelInitializer<Channel> initializer) {
     this.bootstrap = new Bootstrap();
     this.bootstrap
       .channel(NioSocketChannel.class)
       .handler(initializer)
       .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
       .option(ChannelOption.SO_REUSEADDR, true)
-      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+      .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT)
       .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
       .option(ChannelOption.TCP_NODELAY, true);
+  }
 
-    connect(addr);
+  public RpcConnectionPool.RpcConnectionKey getKey() {
+    return key;
   }
 
-  private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+  protected final Class<?> getServiceClass() throws ClassNotFoundException {
+    String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service";
+    return Class.forName(serviceClassName);
+  }
 
+  @SuppressWarnings("unchecked")
+  protected final <T> T getStub(Method stubMethod, Object rpcChannel) {
+    try {
+      return (T) stubMethod.invoke(null, rpcChannel);
+    } catch (Exception e) {
+      throw new RemoteException(e.getMessage(), e);
+    }
+  }
+
+  public abstract <T> T getStub();
+
+  public boolean acquire(long timeout) {
+    if (!checkConnection(timeout)) {
+      return false;
+    }
+    counter.incrementAndGet();
+    return true;
+  }
+
+  public boolean release() {
+    return counter.decrementAndGet() == 0;
+  }
+
+  private boolean checkConnection(long timeout) {
+    if (isConnected()) {
+      return true;
+    }
+
+    InetSocketAddress addr = key.addr;
+    if (addr.isUnresolved()) {
+      addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+    }
+
+    return handleConnectionInternally(addr, timeout);
+  }
+
+  private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+    LOG.warn("Try to connect : " + address);
     this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
             .connect(address)
             .addListener(listener);
   }
-  
-  private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
-    final CountDownLatch latch = new CountDownLatch(1);
-    GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch);
-    connectUsingNetty(addr, listener);
+
+  // first attendant kicks connection
+  private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>();
+
+  private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) {
+    final CountDownLatch ticket = new CountDownLatch(1);
+    final CountDownLatch granted = connect.check(ticket);
+
+    if (ticket == granted) {
+      connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
+    }
 
     try {
-      latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS);
+      granted.await(timeout, TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
+      // ignore
     }
 
-    if (!channelFuture.isSuccess()) {
-      throw new ConnectTimeoutException("Connect error to " + addr +
-          " caused by " + ExceptionUtils.getMessage(channelFuture.cause()));
-    }
-  }
+    boolean success = channelFuture.isSuccess();
 
-  public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
-    if(addr.isUnresolved()){
-       addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+    if (granted.getCount() == 0) {
+      connect.clear(granted);
     }
 
-    handleConnectionInternally(addr);
+    return success;
   }
 
   class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
@@ -142,32 +190,26 @@ public abstract class NettyClientBase implements Closeable {
     }
   }
 
-  public boolean isActive() {
-    return getChannel().isActive();
+  public Channel getChannel() {
+    return channelFuture == null ? null : channelFuture.channel();
   }
 
-  public InetSocketAddress getRemoteAddress() {
-    if (channelFuture == null || channelFuture.channel() == null) {
-      return null;
-    }
-    return (InetSocketAddress) channelFuture.channel().remoteAddress();
+  public boolean isConnected() {
+    Channel channel = getChannel();
+    return channel != null && channel.isOpen() && channel.isActive();
   }
 
-  public Channel getChannel() {
-    return channelFuture.channel();
+  public SocketAddress getRemoteAddress() {
+    Channel channel = getChannel();
+    return channel == null ? null : channel.remoteAddress();
   }
 
   @Override
   public void close() {
-    if (channelFuture != null && getChannel().isActive()) {
-      getChannel().close();
-    }
-
-    if (this.bootstrap != null) {
-      InetSocketAddress address = getRemoteAddress();
-      if (address != null) {
-        LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort());
-      }
+    Channel channel = getChannel();
+    if (channel != null && channel.isOpen()) {
+      LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+      channel.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index 43feeb1..6d1f479 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -18,13 +18,9 @@
 
 package org.apache.tajo.rpc;
 
-import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import io.netty.channel.ConnectTimeoutException;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.internal.logging.CommonsLoggerFactory;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 
@@ -37,7 +33,6 @@ public class RpcConnectionPool {
 
   private Map<RpcConnectionKey, NettyClientBase> connections =
       new HashMap<RpcConnectionKey, NettyClientBase>();
-  private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
   private static RpcConnectionPool instance;
   private final Object lockObject = new Object();
@@ -59,103 +54,101 @@ public class RpcConnectionPool {
       throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
     NettyClientBase client;
     if(rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, 
-          RPC_RETRIES);
+      client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES);
     } else {
-      client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, 
-          RPC_RETRIES);
+      client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES);
     }
-    accepted.add(client.getChannel());
     return client;
   }
 
+  public static final long DEFAULT_TIMEOUT = 3000;
+  public static final long DEFAULT_INTERVAL = 500;
+
   public NettyClientBase getConnection(InetSocketAddress addr,
                                        Class<?> protocolClass, boolean asyncMode)
       throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
+    return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL);
+  }
+
+  public NettyClientBase getConnection(InetSocketAddress addr,
+      Class<?> protocolClass, boolean asyncMode, long timeout, long interval)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
     RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
-    NettyClientBase client = connections.get(key);
 
-    if (client == null) {
-      synchronized (lockObject){
+    RpcUtils.Timer timer = new RpcUtils.Timer(timeout);
+    for (; !timer.isTimedOut(); timer.elapsed()) {
+      NettyClientBase client;
+      synchronized (lockObject) {
         client = connections.get(key);
         if (client == null) {
-          client = makeConnection(key);
-          connections.put(key, client);
+          connections.put(key, client = makeConnection(key));
         }
       }
+      if (client.acquire(timer.remaining())) {
+        return client;
+      }
+      timer.interval(interval);
     }
 
-    if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) {
-      LOG.warn("Try to reconnect : " + addr);
-      client.connect(addr);
-    }
-    return client;
+    throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec");
   }
 
   public void releaseConnection(NettyClientBase client) {
-    if (client == null) return;
-
-    try {
-      synchronized (lockObject) {
-        if (!client.getChannel().isOpen()) {
-          connections.remove(client.getKey());
-          client.close();
-        }
-      }
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Current Connections [" + connections.size() + "] Accepted: " + accepted.size());
-
-      }
-    } catch (Exception e) {
-      LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
-    }
+    release(client, false);
   }
 
   public void closeConnection(NettyClientBase client) {
+    release(client, true);
+  }
+
+  private void release(NettyClientBase client, boolean close) {
     if (client == null) {
       return;
     }
-
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Close connection [" + client.getKey() + "]");
+    }
     try {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Close connection [" + client.getKey() + "]");
-      }
-
-      synchronized (lockObject) {
-        connections.remove(client.getKey());
+      if (returnToPool(client, close)) {
         client.close();
       }
-
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Current Connections [" + connections.size() + "]");
+      }
     } catch (Exception e) {
       LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
     }
   }
 
-  public synchronized void close() {
+  // return true if the connection should be closed
+  private boolean returnToPool(NettyClientBase client, boolean close) {
+    synchronized (lockObject) {
+      if (client.release() && (close || !client.isConnected())) {
+        connections.remove(client.getKey());
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void close() {
     if(LOG.isDebugEnabled()) {
       LOG.debug("Pool Closed");
     }
-    synchronized(lockObject) {
-      for(NettyClientBase eachClient: connections.values()) {
+
+    synchronized (lockObject) {
+      for (NettyClientBase eachClient : connections.values()) {
         try {
           eachClient.close();
         } catch (Exception e) {
           LOG.error("close client pool error", e);
         }
       }
-
       connections.clear();
     }
-
-    try {
-      accepted.close();
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    }
   }
 
-  public synchronized void shutdown(){
+  public void shutdown(){
     close();
     RpcChannelFactory.shutdownGracefully();
   }
@@ -165,16 +158,19 @@ public class RpcConnectionPool {
     final Class<?> protocolClass;
     final boolean asyncMode;
 
+    final String description;
+
     public RpcConnectionKey(InetSocketAddress addr,
                             Class<?> protocolClass, boolean asyncMode) {
       this.addr = addr;
       this.protocolClass = protocolClass;
       this.asyncMode = asyncMode;
+      this.description = "["+ protocolClass + "] " + addr + "," + asyncMode;
     }
 
     @Override
     public String toString() {
-      return "["+ protocolClass + "] " + addr + "," + asyncMode;
+      return description;
     }
 
     @Override
@@ -188,7 +184,7 @@ public class RpcConnectionPool {
 
     @Override
     public int hashCode() {
-      return Objects.hashCode(addr, asyncMode);
+      return description.hashCode();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
index b6be05f..152d426 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java
@@ -21,6 +21,7 @@ package org.apache.tajo.rpc;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class RpcUtils {
 
@@ -65,4 +66,57 @@ public class RpcUtils {
     String [] splitted = addr.split(":");
     return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1]));
   }
+
+  public static class Timer {
+    private long remaining;
+    private long prev;
+    public Timer(long timeout) {
+      this.remaining = timeout;
+      this.prev = System.currentTimeMillis();
+    }
+
+    public boolean isTimedOut() {
+      return remaining <= 0;
+    }
+
+    public void elapsed() {
+      long current = System.currentTimeMillis();
+      remaining -= (prev - current);
+      prev = current;
+    }
+
+    public void interval(long wait) {
+      if (wait <= 0 || isTimedOut()) {
+        return;
+      }
+      try {
+        Thread.sleep(Math.min(remaining, wait));
+      } catch (Exception ex) {
+        // ignore
+      }
+    }
+
+    public long remaining() {
+      return remaining;
+    }
+  }
+
+  public static class Scrutineer<T> {
+
+    private final AtomicReference<T> reference = new AtomicReference<T>();
+
+    T check(T ticket) {
+      T granted = reference.get();
+      for (;granted == null; granted = reference.get()) {
+        if (reference.compareAndSet(null, ticket)) {
+          return ticket;
+        }
+      }
+      return granted;
+    }
+
+    boolean clear(T granted) {
+      return reference.compareAndSet(granted, null);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 31d5265..a974a65 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -34,8 +34,6 @@ import org.junit.rules.ExternalResource;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
 
-import io.netty.channel.ConnectTimeoutException;
-
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
@@ -125,8 +123,12 @@ public class TestAsyncRpc {
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    client = new AsyncRpcClient(DummyProtocol.class,
-        RpcUtils.getConnectAddress(server.getListenAddress()), retries);
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.getConnectAddress(server.getListenAddress()),
+              DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT);
     stub = client.getStub();
   }
 
@@ -296,7 +298,10 @@ public class TestAsyncRpc {
     });
     serverThread.start();
 
-    client = new AsyncRpcClient(DummyProtocol.class, address, retries);
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
     stub = client.getStub();
     stub.echo(future.getController(), echoMessage, future);
 
@@ -308,24 +313,25 @@ public class TestAsyncRpc {
   @Test
   public void testConnectionFailure() throws Exception {
     InetSocketAddress address = new InetSocketAddress("test", 0);
-    boolean expected = false;
     try {
-      new AsyncRpcClient(DummyProtocol.class, address, retries);
-      fail();
-    } catch (ConnectTimeoutException e) {
-      expected = true;
+      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true);
+      NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries);
+      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
     } catch (Throwable throwable) {
       fail();
     }
-    assertTrue(expected);
   }
 
   @Test
   @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    client = new AsyncRpcClient(DummyProtocol.class,
-        RpcUtils.createUnresolved(hostAndPort), retries);
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true);
+    client = new AsyncRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
     Interface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();

http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 07e2dca..10dd766 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -35,7 +35,6 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.concurrent.CountDownLatch;
@@ -116,8 +115,12 @@ public class TestBlockingRpc {
   public void setUpRpcClient() throws Exception {
     retries = 1;
 
-    client = new BlockingRpcClient(DummyProtocol.class,
-        RpcUtils.getConnectAddress(server.getListenAddress()), retries);
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.getConnectAddress(server.getListenAddress()),
+              DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
     stub = client.getStub();
   }
 
@@ -238,7 +241,10 @@ public class TestBlockingRpc {
     });
     serverThread.start();
 
-    client = new BlockingRpcClient(DummyProtocol.class, address, retries);
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
     stub = client.getStub();
 
     EchoMessage response = stub.echo(null, message);
@@ -247,24 +253,23 @@ public class TestBlockingRpc {
 
   @Test
   public void testConnectionFailed() throws Exception {
-    boolean expected = false;
     NettyClientBase client = null;
     
     try {
       int port = server.getListenAddress().getPort() + 1;
-      client = new BlockingRpcClient(DummyProtocol.class,
-          RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries);
+      RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)),
+              DummyProtocol.class, false);
+      client = new BlockingRpcClient(rpcConnectionKey, retries);
+      assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
       client.close();
-      fail("Connection should be failed.");
-    } catch (ConnectException ce) {
-      expected = true;
     } catch (Throwable ce){
       if (client != null) {
         client.close();
       }
       fail();
     }
-    assertTrue(expected);
   }
 
   @Test
@@ -329,8 +334,11 @@ public class TestBlockingRpc {
   @SetupRpcConnection(setupRpcClient=false)
   public void testUnresolvedAddress() throws Exception {
     String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
-    client = new BlockingRpcClient(DummyProtocol.class,
-        RpcUtils.createUnresolved(hostAndPort), retries);
+    RpcConnectionPool.RpcConnectionKey rpcConnectionKey =
+          new RpcConnectionPool.RpcConnectionKey(
+              RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false);
+    client = new BlockingRpcClient(rpcConnectionKey, retries);
+    assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT));
     BlockingInterface stub = client.getStub();
 
     EchoMessage message = EchoMessage.newBuilder()