You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/20 09:22:05 UTC

git commit: TAJO-1050: RPC client does not retry during connecting. (Jihun Kang via jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 28282b561 -> 5b31fc420


TAJO-1050: RPC client does not retry during connecting. (Jihun Kang via jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5b31fc42
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5b31fc42
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5b31fc42

Branch: refs/heads/master
Commit: 5b31fc4207ef1de2418cafbc6d3e714fa03849d0
Parents: 28282b5
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 16:21:33 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 16:21:33 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../main/java/org/apache/tajo/cli/TajoCli.java  | 21 +++--
 .../java/org/apache/tajo/client/TajoClient.java | 20 +++--
 .../tajo/master/TajoMasterClientService.java    |  2 +
 .../org/apache/tajo/rpc/AsyncRpcClient.java     | 11 +--
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |  4 +-
 .../org/apache/tajo/rpc/BlockingRpcClient.java  | 13 +--
 .../org/apache/tajo/rpc/NettyClientBase.java    | 86 +++++++++++-------
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 16 +++-
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  | 91 +++++++++++++++++++-
 .../org/apache/tajo/rpc/TestBlockingRpc.java    | 73 +++++++++++++---
 11 files changed, 261 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5d92881..7b2f77e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -143,6 +143,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1050: RPC client does not retry during connecting. 
+    (Jihun Kang via jinho)
+
     TAJO-948: 'INSERT INTO' statement to non existence table casuses 
     NPE. (Jongyoung Park via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 7c96e34..d5040bd 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -21,8 +21,6 @@ package org.apache.tajo.cli;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
-import jline.TerminalFactory;
-import jline.TerminalFactory.Flavor;
 import jline.UnsupportedTerminal;
 import jline.console.ConsoleReader;
 import org.apache.commons.cli.*;
@@ -36,13 +34,15 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.HAServiceUtil;
 
 import java.io.*;
 import java.lang.reflect.Constructor;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
 
 import static org.apache.tajo.cli.ParsedResult.StatementType.META;
 import static org.apache.tajo.cli.ParsedResult.StatementType.STATEMENT;
@@ -401,6 +401,7 @@ public class TajoCli {
             history.addStatement(parsed.getHistoryStatement() + (parsed.getType() == STATEMENT ? ";" : ""));
           }
         }
+
         exitCode = executeParsedResults(parsedResults);
         currentPrompt = updatePrompt(parser.getState());
 
@@ -494,7 +495,17 @@ public class TajoCli {
   private int executeQuery(String statement) throws ServiceException, IOException {
     checkMasterStatus();
     long startTime = System.currentTimeMillis();
-    ClientProtos.SubmitQueryResponse response = client.executeQuery(statement);
+    ClientProtos.SubmitQueryResponse response = null;
+    try{
+      response = client.executeQuery(statement);
+    } catch (ServiceException e){
+      displayFormatter.printErrorMessage(sout, e.getMessage());
+      wasError = true;
+    } catch(Throwable te){
+      displayFormatter.printErrorMessage(sout, te);
+      wasError = true;
+    }
+
     if (response == null) {
       displayFormatter.printErrorMessage(sout, "response is null");
       wasError = true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index cc993f3..ab3d874 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -61,6 +61,7 @@ import java.sql.SQLException;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
 
@@ -82,6 +83,8 @@ public class TajoClient implements Closeable {
 
   private volatile TajoIdProtos.SessionIdProto sessionId;
 
+  private AtomicBoolean closed = new AtomicBoolean(false);
+
   public TajoClient(TajoConf conf) throws IOException {
     this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
   }
@@ -115,11 +118,14 @@ public class TajoClient implements Closeable {
   }
 
   public boolean isConnected() {
-    try {
-      return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
-    } catch (Exception e) {
-      return false;
+    if(!closed.get()){
+      try {
+        return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
+      } catch (Throwable e) {
+        return false;
+      }
     }
+    return false;
   }
 
   public TajoClient(InetSocketAddress addr) throws IOException {
@@ -152,12 +158,16 @@ public class TajoClient implements Closeable {
 
   @Override
   public void close() {
+    if(closed.getAndSet(true)){
+      return;
+    }
+
     // remove session
     try {
       NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
       tajoMaster.removeSession(null, sessionId);
-    } catch (Exception e) {
+    } catch (Throwable e) {
     }
 
     if(connPool != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index e69393a..738b643 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -264,6 +264,8 @@ public class TajoMasterClientService extends AbstractService {
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         SubmitQueryResponse.Builder responseBuilder = ClientProtos.SubmitQueryResponse.newBuilder();
+        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+        responseBuilder.setIsForwarded(true);
         responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME));
         responseBuilder.setResultCode(ResultCode.ERROR);
         if (e.getMessage() != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index c84d6b6..7a416a8 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -57,13 +57,8 @@ public class AsyncRpcClient extends NettyClientBase {
    * new an instance through this constructor.
    */
   AsyncRpcClient(final Class<?> protocol,
-                 final InetSocketAddress addr) throws Exception {
-    this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
-  }
-
-  AsyncRpcClient(final Class<?> protocol,
-                        final InetSocketAddress addr, ClientSocketChannelFactory factory)
-      throws Exception {
+                        final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+      throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
 
     this.protocol = protocol;
     String serviceClassName = protocol.getName() + "$"
@@ -74,7 +69,7 @@ public class AsyncRpcClient extends NettyClientBase {
     this.handler = new ClientChannelUpstreamHandler();
     pipeFactory = new ProtoPipelineFactory(handler,
         RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory, factory);
+    super.init(addr, pipeFactory, factory, retries);
     rpcChannel = new ProxyRpcChannel();
     this.key = new RpcConnectionKey(addr, protocol, true);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index b7e3cb6..f9c5d3b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -126,11 +126,13 @@ public class AsyncRpcServer extends NettyServerBase {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
         throws Exception{
+
       if (e.getCause() instanceof RemoteCallException) {
         RemoteCallException callException = (RemoteCallException) e.getCause();
         e.getChannel().write(callException.getResponse());
+      } else {
+        LOG.error(e.getCause());
       }
-      throw new RemoteException(serviceName, e.getCause());
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 3d6989a..03d5d3e 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -60,13 +60,8 @@ public class BlockingRpcClient extends NettyClientBase {
    * new an instance through this constructor.
    */
   BlockingRpcClient(final Class<?> protocol,
-                 final InetSocketAddress addr) throws Exception {
-    this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
-  }
-
-  BlockingRpcClient(final Class<?> protocol,
-                           final InetSocketAddress addr, ClientSocketChannelFactory factory)
-      throws Exception {
+                           final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+      throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
 
     this.protocol = protocol;
     String serviceClassName = protocol.getName() + "$"
@@ -78,7 +73,7 @@ public class BlockingRpcClient extends NettyClientBase {
     this.handler = new ClientChannelUpstreamHandler();
     pipeFactory = new ProtoPipelineFactory(handler,
         RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory, factory);
+    super.init(addr, pipeFactory, factory, retries);
     rpcChannel = new ProxyRpcChannel();
 
     this.key = new RpcConnectionKey(addr, protocol, false);
@@ -227,7 +222,7 @@ public class BlockingRpcClient extends NettyClientBase {
     }
   }
 
-  class ProtoCallFuture implements Future<Message> {
+ static class ProtoCallFuture implements Future<Message> {
     private Semaphore sem = new Semaphore(0);
     private Message response = null;
     private Message returnType;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 711c527..d0002de 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,25 +18,25 @@
 
 package org.apache.tajo.rpc;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class NettyClientBase implements Closeable {
   private static Log LOG = LogFactory.getLog(NettyClientBase.class);
   private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60;
+  private static final long PAUSE = 1000; // 1 sec
+  private int numRetries;
 
   protected ClientBootstrap bootstrap;
   private ChannelFuture channelFuture;
@@ -46,40 +46,54 @@ public abstract class NettyClientBase implements Closeable {
 
   public abstract <T> T getStub();
   public abstract RpcConnectionPool.RpcConnectionKey getKey();
+  
+  public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory, 
+      int numRetries) throws ConnectTimeoutException {
+    this.numRetries = numRetries;
+    
+    init(addr, pipeFactory, factory);
+  }
 
   public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
-      throws IOException {
-    try {
-
-      this.bootstrap = new ClientBootstrap(factory);
-      this.bootstrap.setPipelineFactory(pipeFactory);
-      // TODO - should be configurable
-      this.bootstrap.setOption("connectTimeoutMillis", 10000);
-      this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
-      this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
-      this.bootstrap.setOption("tcpNoDelay", true);
-      this.bootstrap.setOption("keepAlive", true);
-
-      connect(addr);
-    } catch (IOException e) {
-      close();
-      throw e;
-    } catch (Throwable t) {
-      throw new IOException("Connect error to " + addr + " cause " + t.getMessage(), t.getCause());
-    }
+      throws ConnectTimeoutException {
+    this.bootstrap = new ClientBootstrap(factory);
+    this.bootstrap.setPipelineFactory(pipeFactory);
+    // TODO - should be configurable
+    this.bootstrap.setOption("connectTimeoutMillis", 10000);
+    this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
+    this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
+    this.bootstrap.setOption("tcpNoDelay", true);
+    this.bootstrap.setOption("keepAlive", true);
+
+    connect(addr);
   }
-
-  public void connect(InetSocketAddress addr) throws Exception {
-    if(addr.isUnresolved()){
-       addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
-    }
+  
+  private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
     this.channelFuture = bootstrap.connect(addr);
 
     final CountDownLatch latch = new CountDownLatch(1);
     this.channelFuture.addListener(new ChannelFutureListener() {
+      private final AtomicInteger retryCount = new AtomicInteger();
+      
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
-        latch.countDown();
+        if (!future.isSuccess()) {
+          if (numRetries > retryCount.getAndIncrement()) {
+            Thread.sleep(PAUSE);
+            channelFuture = bootstrap.connect(addr);
+            channelFuture.addListener(this);
+            
+            LOG.debug("Connecting to " + addr + " has been failed. Retrying to connect.");
+          }
+          else {
+            latch.countDown();
+
+            LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
+          }
+        }
+        else {
+          latch.countDown();
+        }
       }
     });
 
@@ -88,12 +102,20 @@ public abstract class NettyClientBase implements Closeable {
     } catch (InterruptedException e) {
     }
 
-
     if (!channelFuture.isSuccess()) {
-      throw new RuntimeException(channelFuture.getCause());
+      throw new ConnectTimeoutException("Connect error to " + addr +
+          " caused by " + ExceptionUtils.getMessage(channelFuture.getCause()));
     }
   }
 
+  public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
+    if(addr.isUnresolved()){
+       addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+    }
+
+    handleConnectionInternally(addr);
+  }
+
   public boolean isConnected() {
     return getChannel().isConnected();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index aba9c63..2f3d433 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -22,9 +22,12 @@ import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.conf.TajoConf;
+import org.jboss.netty.channel.ConnectTimeoutException;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.logging.CommonsLoggerFactory;
+import org.jboss.netty.logging.InternalLoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +45,8 @@ public class RpcConnectionPool {
   private final ClientSocketChannelFactory channelFactory;
   private final TajoConf conf;
 
+  public final static int RPC_RETRIES = 3;
+
   private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) {
     this.conf = conf;
     this.channelFactory =  channelFactory;
@@ -49,6 +54,7 @@ public class RpcConnectionPool {
 
   public synchronized static RpcConnectionPool getPool(TajoConf conf) {
     if(instance == null) {
+      InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
       instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory());
     }
     return instance;
@@ -58,19 +64,21 @@ public class RpcConnectionPool {
     return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
   }
 
-  private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) throws Exception {
+  private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
     NettyClientBase client;
     if(rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
+      client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
     } else {
-      client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
+      client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
     }
     accepted.add(client.getChannel());
     return client;
   }
 
   public NettyClientBase getConnection(InetSocketAddress addr,
-                                       Class protocolClass, boolean asyncMode) throws Exception {
+                                       Class protocolClass, boolean asyncMode)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
     RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
     NettyClientBase client = connections.get(key);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 72223c1..7c8246a 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -28,6 +28,8 @@ import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
 import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.ConnectTimeoutException;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,15 +52,20 @@ public class TestAsyncRpc {
   static AsyncRpcClient client;
   static Interface stub;
   static DummyProtocolAsyncImpl service;
+  ClientSocketChannelFactory clientChannelFactory;
+  int retries;
 
   @Before
   public void setUp() throws Exception {
+    retries = 1;
+
+    clientChannelFactory = RpcChannelFactory.createClientChannelFactory("TestAsyncRpc", 2);
     service = new DummyProtocolAsyncImpl();
     server = new AsyncRpcServer(DummyProtocol.class,
         service, new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
     client = new AsyncRpcClient(DummyProtocol.class,
-        NetUtils.getConnectAddress(server.getListenAddress()));
+        NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
     stub = client.getStub();
   }
 
@@ -67,9 +74,14 @@ public class TestAsyncRpc {
     if(client != null) {
       client.close();
     }
+
     if(server != null) {
       server.shutdown();
     }
+
+    if (clientChannelFactory != null) {
+      clientChannelFactory.releaseExternalResources();
+    }
   }
 
   boolean calledMarker = false;
@@ -170,9 +182,83 @@ public class TestAsyncRpc {
   }
 
   @Test
+  public void testStubDisconnected() throws Exception {
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+    server.shutdown();
+    server = null;
+
+    stub = client.getStub();
+    stub.echo(future.getController(), echoMessage, future);
+    EchoMessage response = future.get();
+
+    assertNull(response);
+    assertTrue(future.getController().failed());
+    assertTrue(future.getController().errorText() != null);
+  }
+
+  @Test
+  public void testConnectionRetry() throws Exception {
+    retries = 10;
+    final InetSocketAddress address = server.getListenAddress();
+    tearDown();
+
+    EchoMessage echoMessage = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+    CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+    //lazy startup
+    Thread serverThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(100);
+          server = new AsyncRpcServer(DummyProtocol.class,
+              service, address, 2);
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+        server.start();
+      }
+    });
+    serverThread.start();
+
+    clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+    client = new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+    stub = client.getStub();
+    stub.echo(future.getController(), echoMessage, future);
+
+    assertFalse(future.isDone());
+    assertEquals(echoMessage, future.get());
+    assertTrue(future.isDone());
+  }
+
+  @Test
+  public void testConnectionFailure() throws Exception {
+    InetSocketAddress address = new InetSocketAddress("test", 0);
+    boolean expected = false;
+    try {
+      new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+      fail();
+    } catch (ConnectTimeoutException e) {
+      expected = true;
+    } catch (Throwable throwable) {
+      fail();
+    }
+    assertTrue(expected);
+  }
+
+  @Test
   public void testUnresolvedAddress() throws Exception {
+    client.close();
+    client = null;
+
     String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
-    AsyncRpcClient client = new AsyncRpcClient(DummyProtocol.class, NetUtils.createUnresolved(hostAndPort));
+    client = new AsyncRpcClient(DummyProtocol.class,
+        NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
     Interface stub = client.getStub();
     EchoMessage echoMessage = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
@@ -182,6 +268,5 @@ public class TestAsyncRpc {
     assertFalse(future.isDone());
     assertEquals(future.get(), echoMessage);
     assertTrue(future.isDone());
-    client.close();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 3fc51c6..28a3fad 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -26,11 +26,11 @@ import org.apache.tajo.rpc.test.TestProtos.SumRequest;
 import org.apache.tajo.rpc.test.TestProtos.SumResponse;
 import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
 import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
@@ -45,15 +45,21 @@ public class TestBlockingRpc {
   private BlockingRpcClient client;
   private BlockingInterface stub;
   private DummyProtocolBlockingImpl service;
+  private int retries;
+  private ClientSocketChannelFactory clientChannelFactory;
 
   @Before
   public void setUp() throws Exception {
+    retries = 1;
+
+    clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+
     service = new DummyProtocolBlockingImpl();
     server = new BlockingRpcServer(DummyProtocol.class, service,
         new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
     client = new BlockingRpcClient(DummyProtocol.class,
-        NetUtils.getConnectAddress(server.getListenAddress()));
+        NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
     stub = client.getStub();
   }
 
@@ -62,9 +68,14 @@ public class TestBlockingRpc {
     if(client != null) {
       client.close();
     }
+
     if(server != null) {
       server.shutdown();
     }
+
+    if(clientChannelFactory != null){
+      clientChannelFactory.releaseExternalResources();
+    }
   }
 
   @Test
@@ -85,6 +96,7 @@ public class TestBlockingRpc {
 
   @Test
   public void testRpcWithServiceCallable() throws Exception {
+    RpcConnectionPool pool = RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2);
     final SumRequest request = SumRequest.newBuilder()
         .setX1(1)
         .setX2(2)
@@ -92,7 +104,7 @@ public class TestBlockingRpc {
         .setX4(2.0f).build();
 
     SumResponse response =
-    new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2),
+    new ServerCallable<SumResponse>(pool,
         server.getListenAddress(), DummyProtocol.class, false) {
       @Override
       public SumResponse call(NettyClientBase client) throws Exception {
@@ -105,7 +117,7 @@ public class TestBlockingRpc {
     assertEquals(8.15d, response.getResult(), 1e-15);
 
     response =
-        new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2),
+        new ServerCallable<SumResponse>(pool,
             server.getListenAddress(), DummyProtocol.class, false) {
           @Override
           public SumResponse call(NettyClientBase client) throws Exception {
@@ -116,6 +128,7 @@ public class TestBlockingRpc {
         }.withoutRetries();
 
     assertTrue(8.15d == response.getResult());
+    pool.close();
   }
 
   @Test
@@ -137,17 +150,51 @@ public class TestBlockingRpc {
   }
 
   @Test
+  public void testConnectionRetry() throws Exception {
+    retries = 10;
+    final InetSocketAddress address = server.getListenAddress();
+    tearDown();
+
+    EchoMessage message = EchoMessage.newBuilder()
+        .setMessage(MESSAGE).build();
+
+    //lazy startup
+    Thread serverThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          Thread.sleep(100);
+          server = new BlockingRpcServer(DummyProtocol.class, service, address, 2);
+        } catch (Exception e) {
+          fail(e.getMessage());
+        }
+        server.start();
+      }
+    });
+    serverThread.start();
+
+    clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+    client = new BlockingRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+    stub = client.getStub();
+
+    EchoMessage response = stub.echo(null, message);
+    assertEquals(MESSAGE, response.getMessage());
+  }
+
+  @Test
   public void testConnectionFailed() throws Exception {
+    boolean expected = false;
     try {
       int port = server.getListenAddress().getPort() + 1;
       new BlockingRpcClient(DummyProtocol.class,
-          NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)));
+          NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries);
       fail("Connection should be failed.");
-    } catch (Throwable t) {
-      assertTrue(t instanceof IOException);
-      assertNotNull(t.getCause());
-      assertTrue(t.getCause() instanceof ConnectException);
+    } catch (ConnectException ce) {
+      expected = true;
+    } catch (Throwable ce){
+      fail();
     }
+    assertTrue(expected);
   }
 
   @Test
@@ -167,7 +214,6 @@ public class TestBlockingRpc {
               .build();
           stub.deley(null, message);
         } catch (Exception e) {
-          e.printStackTrace();
           error.append(e.getMessage());
         }
         synchronized(error) {
@@ -211,14 +257,17 @@ public class TestBlockingRpc {
 
   @Test
   public void testUnresolvedAddress() throws Exception {
+    client.close();
+    client = null;
+
     String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
-    BlockingRpcClient client = new BlockingRpcClient(DummyProtocol.class, NetUtils.createUnresolved(hostAndPort));
+    client = new BlockingRpcClient(DummyProtocol.class,
+        NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
     BlockingInterface stub = client.getStub();
 
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();
     EchoMessage response2 = stub.echo(null, message);
     assertEquals(MESSAGE, response2.getMessage());
-    client.close();
   }
 }
\ No newline at end of file