You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/09/20 09:22:05 UTC
git commit: TAJO-1050: RPC client does not retry during connecting.
(Jihun Kang via jinho)
Repository: tajo
Updated Branches:
refs/heads/master 28282b561 -> 5b31fc420
TAJO-1050: RPC client does not retry during connecting. (Jihun Kang via jinho)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5b31fc42
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5b31fc42
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5b31fc42
Branch: refs/heads/master
Commit: 5b31fc4207ef1de2418cafbc6d3e714fa03849d0
Parents: 28282b5
Author: jhkim <jh...@apache.org>
Authored: Sat Sep 20 16:21:33 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Sat Sep 20 16:21:33 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../main/java/org/apache/tajo/cli/TajoCli.java | 21 +++--
.../java/org/apache/tajo/client/TajoClient.java | 20 +++--
.../tajo/master/TajoMasterClientService.java | 2 +
.../org/apache/tajo/rpc/AsyncRpcClient.java | 11 +--
.../org/apache/tajo/rpc/AsyncRpcServer.java | 4 +-
.../org/apache/tajo/rpc/BlockingRpcClient.java | 13 +--
.../org/apache/tajo/rpc/NettyClientBase.java | 86 +++++++++++-------
.../org/apache/tajo/rpc/RpcConnectionPool.java | 16 +++-
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 91 +++++++++++++++++++-
.../org/apache/tajo/rpc/TestBlockingRpc.java | 73 +++++++++++++---
11 files changed, 261 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5d92881..7b2f77e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -143,6 +143,9 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-1050: RPC client does not retry during connecting.
+ (Jihun Kang via jinho)
+
TAJO-948: 'INSERT INTO' statement to non existence table casuses
NPE. (Jongyoung Park via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 7c96e34..d5040bd 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -21,8 +21,6 @@ package org.apache.tajo.cli;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
-import jline.TerminalFactory;
-import jline.TerminalFactory.Flavor;
import jline.UnsupportedTerminal;
import jline.console.ConsoleReader;
import org.apache.commons.cli.*;
@@ -36,13 +34,15 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.HAServiceUtil;
import java.io.*;
import java.lang.reflect.Constructor;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
import static org.apache.tajo.cli.ParsedResult.StatementType.META;
import static org.apache.tajo.cli.ParsedResult.StatementType.STATEMENT;
@@ -401,6 +401,7 @@ public class TajoCli {
history.addStatement(parsed.getHistoryStatement() + (parsed.getType() == STATEMENT ? ";" : ""));
}
}
+
exitCode = executeParsedResults(parsedResults);
currentPrompt = updatePrompt(parser.getState());
@@ -494,7 +495,17 @@ public class TajoCli {
private int executeQuery(String statement) throws ServiceException, IOException {
checkMasterStatus();
long startTime = System.currentTimeMillis();
- ClientProtos.SubmitQueryResponse response = client.executeQuery(statement);
+ ClientProtos.SubmitQueryResponse response = null;
+ try{
+ response = client.executeQuery(statement);
+ } catch (ServiceException e){
+ displayFormatter.printErrorMessage(sout, e.getMessage());
+ wasError = true;
+ } catch(Throwable te){
+ displayFormatter.printErrorMessage(sout, te);
+ wasError = true;
+ }
+
if (response == null) {
displayFormatter.printErrorMessage(sout, "response is null");
wasError = true;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index cc993f3..ab3d874 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -61,6 +61,7 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
@@ -82,6 +83,8 @@ public class TajoClient implements Closeable {
private volatile TajoIdProtos.SessionIdProto sessionId;
+ private AtomicBoolean closed = new AtomicBoolean(false);
+
public TajoClient(TajoConf conf) throws IOException {
this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
}
@@ -115,11 +118,14 @@ public class TajoClient implements Closeable {
}
public boolean isConnected() {
- try {
- return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
- } catch (Exception e) {
- return false;
+ if(!closed.get()){
+ try {
+ return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
+ } catch (Throwable e) {
+ return false;
+ }
}
+ return false;
}
public TajoClient(InetSocketAddress addr) throws IOException {
@@ -152,12 +158,16 @@ public class TajoClient implements Closeable {
@Override
public void close() {
+ if(closed.getAndSet(true)){
+ return;
+ }
+
// remove session
try {
NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
tajoMaster.removeSession(null, sessionId);
- } catch (Exception e) {
+ } catch (Throwable e) {
}
if(connPool != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index e69393a..738b643 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -264,6 +264,8 @@ public class TajoMasterClientService extends AbstractService {
} catch (Exception e) {
LOG.error(e.getMessage(), e);
SubmitQueryResponse.Builder responseBuilder = ClientProtos.SubmitQueryResponse.newBuilder();
+ responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+ responseBuilder.setIsForwarded(true);
responseBuilder.setUserName(context.getConf().getVar(ConfVars.USERNAME));
responseBuilder.setResultCode(ResultCode.ERROR);
if (e.getMessage() != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index c84d6b6..7a416a8 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -57,13 +57,8 @@ public class AsyncRpcClient extends NettyClientBase {
* new an instance through this constructor.
*/
AsyncRpcClient(final Class<?> protocol,
- final InetSocketAddress addr) throws Exception {
- this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
- }
-
- AsyncRpcClient(final Class<?> protocol,
- final InetSocketAddress addr, ClientSocketChannelFactory factory)
- throws Exception {
+ final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+ throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
this.protocol = protocol;
String serviceClassName = protocol.getName() + "$"
@@ -74,7 +69,7 @@ public class AsyncRpcClient extends NettyClientBase {
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory, factory);
+ super.init(addr, pipeFactory, factory, retries);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, true);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index b7e3cb6..f9c5d3b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -126,11 +126,13 @@ public class AsyncRpcServer extends NettyServerBase {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception{
+
if (e.getCause() instanceof RemoteCallException) {
RemoteCallException callException = (RemoteCallException) e.getCause();
e.getChannel().write(callException.getResponse());
+ } else {
+ LOG.error(e.getCause());
}
- throw new RemoteException(serviceName, e.getCause());
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 3d6989a..03d5d3e 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -60,13 +60,8 @@ public class BlockingRpcClient extends NettyClientBase {
* new an instance through this constructor.
*/
BlockingRpcClient(final Class<?> protocol,
- final InetSocketAddress addr) throws Exception {
- this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
- }
-
- BlockingRpcClient(final Class<?> protocol,
- final InetSocketAddress addr, ClientSocketChannelFactory factory)
- throws Exception {
+ final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+ throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
this.protocol = protocol;
String serviceClassName = protocol.getName() + "$"
@@ -78,7 +73,7 @@ public class BlockingRpcClient extends NettyClientBase {
this.handler = new ClientChannelUpstreamHandler();
pipeFactory = new ProtoPipelineFactory(handler,
RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory, factory);
+ super.init(addr, pipeFactory, factory, retries);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, false);
@@ -227,7 +222,7 @@ public class BlockingRpcClient extends NettyClientBase {
}
}
- class ProtoCallFuture implements Future<Message> {
+ static class ProtoCallFuture implements Future<Message> {
private Semaphore sem = new Semaphore(0);
private Message response = null;
private Message returnType;
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 711c527..d0002de 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,25 +18,25 @@
package org.apache.tajo.rpc;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.util.NetUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import java.io.Closeable;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public abstract class NettyClientBase implements Closeable {
private static Log LOG = LogFactory.getLog(NettyClientBase.class);
private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60;
+ private static final long PAUSE = 1000; // 1 sec
+ private int numRetries;
protected ClientBootstrap bootstrap;
private ChannelFuture channelFuture;
@@ -46,40 +46,54 @@ public abstract class NettyClientBase implements Closeable {
public abstract <T> T getStub();
public abstract RpcConnectionPool.RpcConnectionKey getKey();
+
+ public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory,
+ int numRetries) throws ConnectTimeoutException {
+ this.numRetries = numRetries;
+
+ init(addr, pipeFactory, factory);
+ }
public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
- throws IOException {
- try {
-
- this.bootstrap = new ClientBootstrap(factory);
- this.bootstrap.setPipelineFactory(pipeFactory);
- // TODO - should be configurable
- this.bootstrap.setOption("connectTimeoutMillis", 10000);
- this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
- this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
- this.bootstrap.setOption("tcpNoDelay", true);
- this.bootstrap.setOption("keepAlive", true);
-
- connect(addr);
- } catch (IOException e) {
- close();
- throw e;
- } catch (Throwable t) {
- throw new IOException("Connect error to " + addr + " cause " + t.getMessage(), t.getCause());
- }
+ throws ConnectTimeoutException {
+ this.bootstrap = new ClientBootstrap(factory);
+ this.bootstrap.setPipelineFactory(pipeFactory);
+ // TODO - should be configurable
+ this.bootstrap.setOption("connectTimeoutMillis", 10000);
+ this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
+ this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
+ this.bootstrap.setOption("tcpNoDelay", true);
+ this.bootstrap.setOption("keepAlive", true);
+
+ connect(addr);
}
-
- public void connect(InetSocketAddress addr) throws Exception {
- if(addr.isUnresolved()){
- addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
- }
+
+ private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
this.channelFuture = bootstrap.connect(addr);
final CountDownLatch latch = new CountDownLatch(1);
this.channelFuture.addListener(new ChannelFutureListener() {
+ private final AtomicInteger retryCount = new AtomicInteger();
+
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- latch.countDown();
+ if (!future.isSuccess()) {
+ if (numRetries > retryCount.getAndIncrement()) {
+ Thread.sleep(PAUSE);
+ channelFuture = bootstrap.connect(addr);
+ channelFuture.addListener(this);
+
+ LOG.debug("Connecting to " + addr + " has been failed. Retrying to connect.");
+ }
+ else {
+ latch.countDown();
+
+ LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
+ }
+ }
+ else {
+ latch.countDown();
+ }
}
});
@@ -88,12 +102,20 @@ public abstract class NettyClientBase implements Closeable {
} catch (InterruptedException e) {
}
-
if (!channelFuture.isSuccess()) {
- throw new RuntimeException(channelFuture.getCause());
+ throw new ConnectTimeoutException("Connect error to " + addr +
+ " caused by " + ExceptionUtils.getMessage(channelFuture.getCause()));
}
}
+ public void connect(InetSocketAddress addr) throws ConnectTimeoutException {
+ if(addr.isUnresolved()){
+ addr = NetUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+ }
+
+ handleConnectionInternally(addr);
+ }
+
public boolean isConnected() {
return getChannel().isConnected();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index aba9c63..2f3d433 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -22,9 +22,12 @@ import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.conf.TajoConf;
+import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.logging.CommonsLoggerFactory;
+import org.jboss.netty.logging.InternalLoggerFactory;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +45,8 @@ public class RpcConnectionPool {
private final ClientSocketChannelFactory channelFactory;
private final TajoConf conf;
+ public final static int RPC_RETRIES = 3;
+
private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) {
this.conf = conf;
this.channelFactory = channelFactory;
@@ -49,6 +54,7 @@ public class RpcConnectionPool {
public synchronized static RpcConnectionPool getPool(TajoConf conf) {
if(instance == null) {
+ InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory());
}
return instance;
@@ -58,19 +64,21 @@ public class RpcConnectionPool {
return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
}
- private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) throws Exception {
+ private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
NettyClientBase client;
if(rpcConnectionKey.asyncMode) {
- client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
+ client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
} else {
- client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
+ client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
}
accepted.add(client.getChannel());
return client;
}
public NettyClientBase getConnection(InetSocketAddress addr,
- Class protocolClass, boolean asyncMode) throws Exception {
+ Class protocolClass, boolean asyncMode)
+ throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
NettyClientBase client = connections.get(key);
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 72223c1..7c8246a 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -28,6 +28,8 @@ import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.ConnectTimeoutException;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -50,15 +52,20 @@ public class TestAsyncRpc {
static AsyncRpcClient client;
static Interface stub;
static DummyProtocolAsyncImpl service;
+ ClientSocketChannelFactory clientChannelFactory;
+ int retries;
@Before
public void setUp() throws Exception {
+ retries = 1;
+
+ clientChannelFactory = RpcChannelFactory.createClientChannelFactory("TestAsyncRpc", 2);
service = new DummyProtocolAsyncImpl();
server = new AsyncRpcServer(DummyProtocol.class,
service, new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
client = new AsyncRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(server.getListenAddress()));
+ NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
stub = client.getStub();
}
@@ -67,9 +74,14 @@ public class TestAsyncRpc {
if(client != null) {
client.close();
}
+
if(server != null) {
server.shutdown();
}
+
+ if (clientChannelFactory != null) {
+ clientChannelFactory.releaseExternalResources();
+ }
}
boolean calledMarker = false;
@@ -170,9 +182,83 @@ public class TestAsyncRpc {
}
@Test
+ public void testStubDisconnected() throws Exception {
+
+ EchoMessage echoMessage = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+ CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+ server.shutdown();
+ server = null;
+
+ stub = client.getStub();
+ stub.echo(future.getController(), echoMessage, future);
+ EchoMessage response = future.get();
+
+ assertNull(response);
+ assertTrue(future.getController().failed());
+ assertTrue(future.getController().errorText() != null);
+ }
+
+ @Test
+ public void testConnectionRetry() throws Exception {
+ retries = 10;
+ final InetSocketAddress address = server.getListenAddress();
+ tearDown();
+
+ EchoMessage echoMessage = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+ CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
+
+ //lazy startup
+ Thread serverThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ server = new AsyncRpcServer(DummyProtocol.class,
+ service, address, 2);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ server.start();
+ }
+ });
+ serverThread.start();
+
+ clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+ client = new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ stub = client.getStub();
+ stub.echo(future.getController(), echoMessage, future);
+
+ assertFalse(future.isDone());
+ assertEquals(echoMessage, future.get());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void testConnectionFailure() throws Exception {
+ InetSocketAddress address = new InetSocketAddress("test", 0);
+ boolean expected = false;
+ try {
+ new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ fail();
+ } catch (ConnectTimeoutException e) {
+ expected = true;
+ } catch (Throwable throwable) {
+ fail();
+ }
+ assertTrue(expected);
+ }
+
+ @Test
public void testUnresolvedAddress() throws Exception {
+ client.close();
+ client = null;
+
String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
- AsyncRpcClient client = new AsyncRpcClient(DummyProtocol.class, NetUtils.createUnresolved(hostAndPort));
+ client = new AsyncRpcClient(DummyProtocol.class,
+ NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
Interface stub = client.getStub();
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
@@ -182,6 +268,5 @@ public class TestAsyncRpc {
assertFalse(future.isDone());
assertEquals(future.get(), echoMessage);
assertTrue(future.isDone());
- client.close();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/5b31fc42/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 3fc51c6..28a3fad 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -26,11 +26,11 @@ import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
import org.apache.tajo.util.NetUtils;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
@@ -45,15 +45,21 @@ public class TestBlockingRpc {
private BlockingRpcClient client;
private BlockingInterface stub;
private DummyProtocolBlockingImpl service;
+ private int retries;
+ private ClientSocketChannelFactory clientChannelFactory;
@Before
public void setUp() throws Exception {
+ retries = 1;
+
+ clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+
service = new DummyProtocolBlockingImpl();
server = new BlockingRpcServer(DummyProtocol.class, service,
new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
client = new BlockingRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(server.getListenAddress()));
+ NetUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
stub = client.getStub();
}
@@ -62,9 +68,14 @@ public class TestBlockingRpc {
if(client != null) {
client.close();
}
+
if(server != null) {
server.shutdown();
}
+
+ if(clientChannelFactory != null){
+ clientChannelFactory.releaseExternalResources();
+ }
}
@Test
@@ -85,6 +96,7 @@ public class TestBlockingRpc {
@Test
public void testRpcWithServiceCallable() throws Exception {
+ RpcConnectionPool pool = RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2);
final SumRequest request = SumRequest.newBuilder()
.setX1(1)
.setX2(2)
@@ -92,7 +104,7 @@ public class TestBlockingRpc {
.setX4(2.0f).build();
SumResponse response =
- new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2),
+ new ServerCallable<SumResponse>(pool,
server.getListenAddress(), DummyProtocol.class, false) {
@Override
public SumResponse call(NettyClientBase client) throws Exception {
@@ -105,7 +117,7 @@ public class TestBlockingRpc {
assertEquals(8.15d, response.getResult(), 1e-15);
response =
- new ServerCallable<SumResponse>(RpcConnectionPool.newPool(new TajoConf(), getClass().getSimpleName(), 2),
+ new ServerCallable<SumResponse>(pool,
server.getListenAddress(), DummyProtocol.class, false) {
@Override
public SumResponse call(NettyClientBase client) throws Exception {
@@ -116,6 +128,7 @@ public class TestBlockingRpc {
}.withoutRetries();
assertTrue(8.15d == response.getResult());
+ pool.close();
}
@Test
@@ -137,17 +150,51 @@ public class TestBlockingRpc {
}
@Test
+ public void testConnectionRetry() throws Exception {
+ retries = 10;
+ final InetSocketAddress address = server.getListenAddress();
+ tearDown();
+
+ EchoMessage message = EchoMessage.newBuilder()
+ .setMessage(MESSAGE).build();
+
+ //lazy startup
+ Thread serverThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(100);
+ server = new BlockingRpcServer(DummyProtocol.class, service, address, 2);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ server.start();
+ }
+ });
+ serverThread.start();
+
+ clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+ client = new BlockingRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ stub = client.getStub();
+
+ EchoMessage response = stub.echo(null, message);
+ assertEquals(MESSAGE, response.getMessage());
+ }
+
+ @Test
public void testConnectionFailed() throws Exception {
+ boolean expected = false;
try {
int port = server.getListenAddress().getPort() + 1;
new BlockingRpcClient(DummyProtocol.class,
- NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)));
+ NetUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries);
fail("Connection should be failed.");
- } catch (Throwable t) {
- assertTrue(t instanceof IOException);
- assertNotNull(t.getCause());
- assertTrue(t.getCause() instanceof ConnectException);
+ } catch (ConnectException ce) {
+ expected = true;
+ } catch (Throwable ce){
+ fail();
}
+ assertTrue(expected);
}
@Test
@@ -167,7 +214,6 @@ public class TestBlockingRpc {
.build();
stub.deley(null, message);
} catch (Exception e) {
- e.printStackTrace();
error.append(e.getMessage());
}
synchronized(error) {
@@ -211,14 +257,17 @@ public class TestBlockingRpc {
@Test
public void testUnresolvedAddress() throws Exception {
+ client.close();
+ client = null;
+
String hostAndPort = NetUtils.normalizeInetSocketAddress(server.getListenAddress());
- BlockingRpcClient client = new BlockingRpcClient(DummyProtocol.class, NetUtils.createUnresolved(hostAndPort));
+ client = new BlockingRpcClient(DummyProtocol.class,
+ NetUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
BlockingInterface stub = client.getStub();
EchoMessage message = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
EchoMessage response2 = stub.echo(null, message);
assertEquals(MESSAGE, response2.getMessage());
- client.close();
}
}
\ No newline at end of file