You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/05 07:15:58 UTC
[06/10] tajo git commit: TAJO-527: Upgrade to Netty 4
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index 0727f71..ed6b634 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -19,73 +19,125 @@
package org.apache.tajo.rpc;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.*;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import java.util.concurrent.Executors;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public final class RpcChannelFactory {
private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
-
+
private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
- private static ClientSocketChannelFactory factory;
- private static AtomicInteger clientCount = new AtomicInteger(0);
+ private static final Object lockObjectForLoopGroup = new Object();
private static AtomicInteger serverCount = new AtomicInteger(0);
+ public enum ClientChannelId {
+ CLIENT_DEFAULT,
+ FETCHER
+ }
+
+ private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
+ new ConcurrentHashMap<ClientChannelId, Integer>();
+ private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
+ new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
+
private RpcChannelFactory(){
}
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
+
+ defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
+ defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
+ }
/**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
- */
- public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory() {
- return getSharedClientChannelFactory(DEFAULT_WORKER_NUM);
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ */
+ public static EventLoopGroup getSharedClientEventloopGroup() {
+ return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
+ }
+
+ /**
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ *
+ * @param workerNum The number of workers
+ */
+ public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
+ //shared woker and boss pool
+ return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
}
/**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
*
- * @param workerNum The number of workers
+ * @param clientId
+ * @param workerNum
+ * @return
*/
- public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(int workerNum){
- //shared woker and boss pool
- if(factory == null){
- factory = createClientChannelFactory("Internal-Client", workerNum);
+ public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
+ Queue<EventLoopGroup> eventLoopGroupQueue;
+ EventLoopGroup returnEventLoopGroup;
+
+ synchronized (lockObjectForLoopGroup) {
+ eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
+ if (eventLoopGroupQueue == null) {
+ eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
+ }
+
+ returnEventLoopGroup = eventLoopGroupQueue.poll();
+ if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
+ returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
+ }
+ eventLoopGroupQueue.add(returnEventLoopGroup);
}
- return factory;
+
+ return returnEventLoopGroup;
+ }
+
+ protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
+ return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
}
// Client must release the external resources
- public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) {
- name = name + "-" + clientCount.incrementAndGet();
- if(LOG.isDebugEnabled()){
- LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum);
+ protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
+ int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
+ Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
+ eventLoopGroupPool.put(clientId, loopGroupQueue);
+
+ for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
+ loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
}
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build();
- ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build();
+ return loopGroupQueue;
+ }
- NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1,
- new HashedWheelTimer(), ThreadNameDeterminer.CURRENT);
- NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum,
- ThreadNameDeterminer.CURRENT);
+ protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
+ }
+
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
- return new NioClientSocketChannelFactory(bossPool, workerPool);
+ return new NioEventLoopGroup(workerNum, clientFactory);
}
// Client must release the external resources
- public static synchronized ServerSocketChannelFactory createServerChannelFactory(String name, int workerNum) {
+ public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
name = name + "-" + serverCount.incrementAndGet();
if(LOG.isInfoEnabled()){
LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
@@ -93,22 +145,38 @@ public final class RpcChannelFactory {
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
-
- NioServerBossPool bossPool =
- new NioServerBossPool(Executors.newCachedThreadPool(bossFactory), 1, ThreadNameDeterminer.CURRENT);
- NioWorkerPool workerPool =
- new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, ThreadNameDeterminer.CURRENT);
-
- return new NioServerSocketChannelFactory(bossPool, workerPool);
+
+ EventLoopGroup bossGroup =
+ new NioEventLoopGroup(1, bossFactory);
+ EventLoopGroup workerGroup =
+ new NioEventLoopGroup(workerNum, workerFactory);
+
+ return new ServerBootstrap().group(bossGroup, workerGroup);
}
- public static synchronized void shutdown(){
+ public static void shutdownGracefully(){
if(LOG.isDebugEnabled()) {
LOG.debug("Shutdown Shared RPC Pool");
}
- if (factory != null) {
- factory.releaseExternalResources();
+
+ synchronized(lockObjectForLoopGroup) {
+ for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
+ for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
+ eventLoopGroup.shutdownGracefully();
+ }
+
+ eventLoopGroupQueue.clear();
+ }
+ eventLoopGroupPool.clear();
+ }
+ }
+
+ static class CleanUpHandler extends Thread {
+
+ @Override
+ public void run() {
+ RpcChannelFactory.shutdownGracefully();
}
- factory = null;
+
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index c8e622b..4ad9771 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -21,79 +21,71 @@ package org.apache.tajo.rpc;
import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.logging.CommonsLoggerFactory;
-import org.jboss.netty.logging.InternalLoggerFactory;
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.internal.logging.CommonsLoggerFactory;
+import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
public class RpcConnectionPool {
private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
- private ConcurrentMap<RpcConnectionKey, NettyClientBase> connections =
- new ConcurrentHashMap<RpcConnectionKey, NettyClientBase>();
- private ChannelGroup accepted = new DefaultChannelGroup();
+ private Map<RpcConnectionKey, NettyClientBase> connections =
+ new HashMap<RpcConnectionKey, NettyClientBase>();
+ private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static RpcConnectionPool instance;
- private final ClientSocketChannelFactory channelFactory;
+ private final Object lockObject = new Object();
public final static int RPC_RETRIES = 3;
- private RpcConnectionPool(ClientSocketChannelFactory channelFactory) {
- this.channelFactory = channelFactory;
+ private RpcConnectionPool() {
}
public synchronized static RpcConnectionPool getPool() {
if(instance == null) {
InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
- instance = new RpcConnectionPool(RpcChannelFactory.getSharedClientChannelFactory());
+ instance = new RpcConnectionPool();
}
return instance;
}
- public synchronized static RpcConnectionPool newPool(String poolName, int workerNum) {
- return new RpcConnectionPool(RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
- }
-
private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
NettyClientBase client;
if(rpcConnectionKey.asyncMode) {
- client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
+ client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr,
+ RPC_RETRIES);
} else {
- client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
+ client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr,
+ RPC_RETRIES);
}
accepted.add(client.getChannel());
return client;
}
public NettyClientBase getConnection(InetSocketAddress addr,
- Class protocolClass, boolean asyncMode)
+ Class<?> protocolClass, boolean asyncMode)
throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
NettyClientBase client = connections.get(key);
if (client == null) {
- boolean added;
- synchronized (connections){
- client = makeConnection(key);
- connections.put(key, client);
- added = true;
- }
-
- if (!added) {
- client.close();
+ synchronized (lockObject){
client = connections.get(key);
+ if (client == null) {
+ client = makeConnection(key);
+ connections.put(key, client);
+ }
}
}
- if (!client.getChannel().isOpen() || !client.getChannel().isConnected()) {
+ if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) {
LOG.warn("Try to reconnect : " + addr);
client.connect(addr);
}
@@ -104,9 +96,11 @@ public class RpcConnectionPool {
if (client == null) return;
try {
- if (!client.getChannel().isOpen()) {
- connections.remove(client.getKey());
- client.close();
+ synchronized (lockObject) {
+ if (!client.getChannel().isOpen()) {
+ connections.remove(client.getKey());
+ client.close();
+ }
}
if(LOG.isDebugEnabled()) {
@@ -128,8 +122,10 @@ public class RpcConnectionPool {
LOG.debug("Close connection [" + client.getKey() + "]");
}
- connections.remove(client.getKey());
- client.close();
+ synchronized (lockObject) {
+ connections.remove(client.getKey());
+ client.close();
+ }
} catch (Exception e) {
LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
@@ -140,7 +136,7 @@ public class RpcConnectionPool {
if(LOG.isDebugEnabled()) {
LOG.debug("Pool Closed");
}
- synchronized(connections) {
+ synchronized(lockObject) {
for(NettyClientBase eachClient: connections.values()) {
try {
eachClient.close();
@@ -148,11 +144,12 @@ public class RpcConnectionPool {
LOG.error("close client pool error", e);
}
}
+
+ connections.clear();
}
- connections.clear();
try {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ accepted.close();
} catch (Throwable t) {
LOG.error(t);
}
@@ -160,18 +157,16 @@ public class RpcConnectionPool {
public synchronized void shutdown(){
close();
- if(channelFactory != null){
- channelFactory.releaseExternalResources();
- }
+ RpcChannelFactory.shutdownGracefully();
}
static class RpcConnectionKey {
final InetSocketAddress addr;
- final Class protocolClass;
+ final Class<?> protocolClass;
final boolean asyncMode;
public RpcConnectionKey(InetSocketAddress addr,
- Class protocolClass, boolean asyncMode) {
+ Class<?> protocolClass, boolean asyncMode) {
this.addr = addr;
this.protocolClass = protocolClass;
this.asyncMode = asyncMode;
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index 140f781..fb1cec2 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -18,30 +18,30 @@
package org.apache.tajo.rpc;
-import com.google.protobuf.ServiceException;
-
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import com.google.protobuf.ServiceException;
+
public abstract class ServerCallable<T> {
protected InetSocketAddress addr;
protected long startTime;
protected long endTime;
- protected Class protocol;
+ protected Class<?> protocol;
protected boolean asyncMode;
protected boolean closeConn;
protected RpcConnectionPool connPool;
public abstract T call(NettyClientBase client) throws Exception;
- public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol, boolean asyncMode) {
+ public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) {
this(connPool, addr, protocol, asyncMode, false);
}
- public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol,
+ public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol,
boolean asyncMode, boolean closeConn) {
this.connPool = connPool;
this.addr = addr;
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 61a92bc..31d5265 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -27,13 +27,21 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import io.netty.channel.ConnectTimeoutException;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -47,43 +55,102 @@ public class TestAsyncRpc {
double sum;
String echo;
- static AsyncRpcServer server;
- static AsyncRpcClient client;
- static Interface stub;
- static DummyProtocolAsyncImpl service;
- ClientSocketChannelFactory clientChannelFactory;
+ AsyncRpcServer server;
+ AsyncRpcClient client;
+ Interface stub;
+ DummyProtocolAsyncImpl service;
int retries;
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @interface SetupRpcConnection {
+ boolean setupRpcServer() default true;
+ boolean setupRpcClient() default true;
+ }
+
+ @Rule
+ public ExternalResource resource = new ExternalResource() {
+
+ private Description description;
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ this.description = description;
+ return super.apply(base, description);
+ }
- @Before
- public void setUp() throws Exception {
- retries = 1;
+ @Override
+ protected void before() throws Throwable {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ setUpRpcServer();
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ setUpRpcClient();
+ }
+ }
+
+ @Override
+ protected void after() {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory("TestAsyncRpc", 2);
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ try {
+ tearDownRpcClient();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ try {
+ tearDownRpcServer();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ };
+
+ public void setUpRpcServer() throws Exception {
service = new DummyProtocolAsyncImpl();
server = new AsyncRpcServer(DummyProtocol.class,
service, new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
+ }
+
+ public void setUpRpcClient() throws Exception {
+ retries = 1;
+
client = new AsyncRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+ RpcUtils.getConnectAddress(server.getListenAddress()), retries);
stub = client.getStub();
}
- @After
- public void tearDown() throws Exception {
- if(client != null) {
- client.close();
- }
-
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ RpcChannelFactory.shutdownGracefully();
+ }
+
+ public void tearDownRpcServer() throws Exception {
if(server != null) {
server.shutdown();
+ server = null;
}
-
- if (clientChannelFactory != null) {
- clientChannelFactory.releaseExternalResources();
+ }
+
+ public void tearDownRpcClient() throws Exception {
+ if(client != null) {
+ client.close();
+ client = null;
}
}
boolean calledMarker = false;
+
@Test
public void testRpc() throws Exception {
@@ -130,7 +197,7 @@ public class TestAsyncRpc {
testNullLatch.countDown();
}
});
- testNullLatch.await(1000, TimeUnit.MILLISECONDS);
+ assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(service.getNullCalled);
}
@@ -169,8 +236,7 @@ public class TestAsyncRpc {
.setMessage(MESSAGE).build();
CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
- server.shutdown();
- server = null;
+ tearDownRpcServer();
stub.echo(future.getController(), echoMessage, future);
EchoMessage response = future.get();
@@ -187,8 +253,10 @@ public class TestAsyncRpc {
.setMessage(MESSAGE).build();
CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
- server.shutdown();
- server = null;
+ if (server != null) {
+ server.shutdown(true);
+ server = null;
+ }
stub = client.getStub();
stub.echo(future.getController(), echoMessage, future);
@@ -200,10 +268,13 @@ public class TestAsyncRpc {
}
@Test
+ @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
public void testConnectionRetry() throws Exception {
retries = 10;
- final InetSocketAddress address = server.getListenAddress();
- tearDown();
+ ServerSocket serverSocket = new ServerSocket(0);
+ final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+ serverSocket.close();
+ service = new DummyProtocolAsyncImpl();
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
@@ -214,7 +285,7 @@ public class TestAsyncRpc {
@Override
public void run() {
try {
- Thread.sleep(100);
+ Thread.sleep(1000);
server = new AsyncRpcServer(DummyProtocol.class,
service, address, 2);
} catch (Exception e) {
@@ -225,8 +296,7 @@ public class TestAsyncRpc {
});
serverThread.start();
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
- client = new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ client = new AsyncRpcClient(DummyProtocol.class, address, retries);
stub = client.getStub();
stub.echo(future.getController(), echoMessage, future);
@@ -240,7 +310,7 @@ public class TestAsyncRpc {
InetSocketAddress address = new InetSocketAddress("test", 0);
boolean expected = false;
try {
- new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ new AsyncRpcClient(DummyProtocol.class, address, retries);
fail();
} catch (ConnectTimeoutException e) {
expected = true;
@@ -251,13 +321,11 @@ public class TestAsyncRpc {
}
@Test
+ @SetupRpcConnection(setupRpcClient=false)
public void testUnresolvedAddress() throws Exception {
- client.close();
- client = null;
-
String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
client = new AsyncRpcClient(DummyProtocol.class,
- RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+ RpcUtils.createUnresolved(hostAndPort), retries);
Interface stub = client.getStub();
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 746bfcb..07e2dca 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -24,13 +24,20 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.Rule;
import org.junit.Test;
-
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -44,35 +51,92 @@ public class TestBlockingRpc {
private BlockingInterface stub;
private DummyProtocolBlockingImpl service;
private int retries;
- private ClientSocketChannelFactory clientChannelFactory;
-
- @Before
- public void setUp() throws Exception {
- retries = 1;
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @interface SetupRpcConnection {
+ boolean setupRpcServer() default true;
+ boolean setupRpcClient() default true;
+ }
+
+ @Rule
+ public ExternalResource resource = new ExternalResource() {
+
+ private Description description;
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ this.description = description;
+ return super.apply(base, description);
+ }
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+ @Override
+ protected void before() throws Throwable {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ setUpRpcServer();
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ setUpRpcClient();
+ }
+ }
+ @Override
+ protected void after() {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ try {
+ tearDownRpcClient();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ try {
+ tearDownRpcServer();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ };
+
+ public void setUpRpcServer() throws Exception {
service = new DummyProtocolBlockingImpl();
server = new BlockingRpcServer(DummyProtocol.class, service,
new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
+ }
+
+ public void setUpRpcClient() throws Exception {
+ retries = 1;
+
client = new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+ RpcUtils.getConnectAddress(server.getListenAddress()), retries);
stub = client.getStub();
}
- @After
- public void tearDown() throws Exception {
- if(client != null) {
- client.close();
- }
-
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ RpcChannelFactory.shutdownGracefully();
+ }
+
+ public void tearDownRpcServer() throws Exception {
if(server != null) {
server.shutdown();
+ server = null;
}
-
- if(clientChannelFactory != null){
- clientChannelFactory.releaseExternalResources();
+ }
+
+ public void tearDownRpcClient() throws Exception {
+ if(client != null) {
+ client.close();
+ client = null;
}
}
@@ -93,8 +157,9 @@ public class TestBlockingRpc {
}
@Test
+ @SetupRpcConnection(setupRpcClient=false)
public void testRpcWithServiceCallable() throws Exception {
- RpcConnectionPool pool = RpcConnectionPool.newPool(getClass().getSimpleName(), 2);
+ RpcConnectionPool pool = RpcConnectionPool.getPool();
final SumRequest request = SumRequest.newBuilder()
.setX1(1)
.setX2(2)
@@ -148,10 +213,12 @@ public class TestBlockingRpc {
}
@Test
+ @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
public void testConnectionRetry() throws Exception {
retries = 10;
- final InetSocketAddress address = server.getListenAddress();
- tearDown();
+ ServerSocket serverSocket = new ServerSocket(0);
+ final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+ serverSocket.close();
EchoMessage message = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
@@ -161,8 +228,8 @@ public class TestBlockingRpc {
@Override
public void run() {
try {
- Thread.sleep(100);
- server = new BlockingRpcServer(DummyProtocol.class, service, address, 2);
+ Thread.sleep(1000);
+ server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
} catch (Exception e) {
fail(e.getMessage());
}
@@ -171,8 +238,7 @@ public class TestBlockingRpc {
});
serverThread.start();
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
- client = new BlockingRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ client = new BlockingRpcClient(DummyProtocol.class, address, retries);
stub = client.getStub();
EchoMessage response = stub.echo(null, message);
@@ -182,14 +248,20 @@ public class TestBlockingRpc {
@Test
public void testConnectionFailed() throws Exception {
boolean expected = false;
+ NettyClientBase client = null;
+
try {
int port = server.getListenAddress().getPort() + 1;
- new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries);
+ client = new BlockingRpcClient(DummyProtocol.class,
+ RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries);
+ client.close();
fail("Connection should be failed.");
} catch (ConnectException ce) {
expected = true;
} catch (Throwable ce){
+ if (client != null) {
+ client.close();
+ }
fail();
}
assertTrue(expected);
@@ -240,7 +312,7 @@ public class TestBlockingRpc {
};
shutdownThread.start();
- latch.await(5 * 1000, TimeUnit.MILLISECONDS);
+ assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS));
assertTrue(latch.getCount() == 0);
@@ -254,13 +326,11 @@ public class TestBlockingRpc {
}
@Test
+ @SetupRpcConnection(setupRpcClient=false)
public void testUnresolvedAddress() throws Exception {
- client.close();
- client = null;
-
String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
client = new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+ RpcUtils.createUnresolved(hostAndPort), retries);
BlockingInterface stub = client.getStub();
EchoMessage message = EchoMessage.newBuilder()
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
index 90499ce..0ca7563 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
@@ -27,7 +27,6 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-@SuppressWarnings("UnusedDeclaration")
public class DummyProtocolAsyncImpl implements Interface {
private static final Log LOG =
LogFactory.getLog(DummyProtocolAsyncImpl.class);
@@ -74,7 +73,7 @@ public class DummyProtocolAsyncImpl implements Interface {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage());
}
done.run(request);
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 5513aa6..957b4c1 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -168,6 +168,18 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
index cf8a54e..389cd31 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
@@ -21,13 +21,16 @@ package org.apache.tajo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
@@ -38,20 +41,20 @@ public class HttpFileServer {
private final InetSocketAddress addr;
private InetSocketAddress bindAddr;
private ServerBootstrap bootstrap = null;
- private ChannelFactory factory = null;
+ private EventLoopGroup eventloopGroup = null;
private ChannelGroup channelGroup = null;
public HttpFileServer(final InetSocketAddress addr) {
this.addr = addr;
- this.factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
- 2);
+ this.eventloopGroup = new NioEventLoopGroup(2, Executors.defaultThreadFactory());
// Configure the server.
- this.bootstrap = new ServerBootstrap(factory);
- // Set up the event pipeline factory.
- this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
- this.channelGroup = new DefaultChannelGroup();
+ this.bootstrap = new ServerBootstrap();
+ this.bootstrap.childHandler(new HttpFileServerChannelInitializer())
+ .group(eventloopGroup)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .channel(NioServerSocketChannel.class);
+ this.channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
public HttpFileServer(String bindaddr) {
@@ -60,9 +63,9 @@ public class HttpFileServer {
public void start() {
// Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(addr);
- channelGroup.add(channel);
- this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+ ChannelFuture future = bootstrap.bind(addr).syncUninterruptibly();
+ channelGroup.add(future.channel());
+ this.bindAddr = (InetSocketAddress) future.channel().localAddress();
LOG.info("HttpFileServer starts up ("
+ this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+ ")");
@@ -73,9 +76,8 @@ public class HttpFileServer {
}
public void stop() {
- ChannelGroupFuture future = channelGroup.close();
- future.awaitUninterruptibly();
- factory.releaseExternalResources();
+ channelGroup.close();
+ eventloopGroup.shutdownGracefully();
LOG.info("HttpFileServer shutdown ("
+ this.bindAddr.getAddress().getHostAddress() + ":"
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
new file mode 100644
index 0000000..f2a97b6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+public class HttpFileServerChannelInitializer extends ChannelInitializer<Channel> {
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+
+ // Uncomment the following lines if you want HTTPS
+ //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ //engine.setUseClientMode(false);
+ //pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
+ pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+
+ pipeline.addLast("handler", new HttpFileServerHandler());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
index 6c77317..78902f3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
@@ -18,16 +18,13 @@
package org.apache.tajo;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
import java.io.File;
import java.io.FileNotFoundException;
@@ -35,39 +32,34 @@ import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
-/**
- * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
- */
-public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
+public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+
+ private final Log LOG = LogFactory.getLog(HttpFileServerHandler.class);
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+
+ if (request.getMethod() != HttpMethod.GET) {
+ sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
final String path = sanitizeUri(request.getUri());
if (path == null) {
- sendError(ctx, FORBIDDEN);
+ sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
File file = new File(path);
if (file.isHidden() || !file.exists()) {
- sendError(ctx, NOT_FOUND);
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
if (!file.isFile()) {
- sendError(ctx, FORBIDDEN);
+ sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
@@ -75,62 +67,62 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException fnfe) {
- sendError(ctx, NOT_FOUND);
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
long fileLength = raf.length();
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- setContentLength(response, fileLength);
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ HttpHeaders.setContentLength(response, fileLength);
setContentTypeHeader(response);
- Channel ch = e.getChannel();
-
// Write the initial line and the header.
- ch.write(response);
+ ctx.write(response);
// Write the content.
ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
+ ChannelFuture lastContentFuture;
+ if (ctx.pipeline().get(SslHandler.class) != null) {
// Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
+ lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)));
} else {
// No encryption - use zero-copy.
- final FileRegion region =
- new DefaultFileRegion(raf.getChannel(), 0, fileLength);
- writeFuture = ch.write(region);
- writeFuture.addListener(new ChannelFutureProgressListener() {
- public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
+ final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
+ writeFuture = ctx.write(region);
+ lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ writeFuture.addListener(new ChannelProgressiveFutureListener() {
+ @Override
+ public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
+ throws Exception {
+ LOG.trace(String.format("%s: %d / %d", path, progress, total));
}
- public void operationProgressed(
- ChannelFuture future, long amount, long current, long total) {
- System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
+ @Override
+ public void operationComplete(ChannelProgressiveFuture future) throws Exception {
+ region.release();
}
});
}
// Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
+ if (!HttpHeaders.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
+ Channel ch = ctx.channel();
if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
+ sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
+ LOG.error(cause.getMessage(), cause);
+ if (ch.isActive()) {
+ sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
@@ -161,14 +153,13 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n",
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+ Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n",
CharsetUtil.UTF_8));
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
@@ -178,7 +169,7 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
* HTTP response
*/
private static void setContentTypeHeader(HttpResponse response) {
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
deleted file mode 100644
index cecf93b..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-// Uncomment the following lines if you want HTTPS
-//import javax.net.ssl.SSLEngine;
-//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
-//import org.jboss.netty.handler.ssl.SslHandler;
-
-//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
-public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following lines if you want HTTPS
- //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
- //engine.setUseClientMode(false);
- //pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-
- pipeline.addLast("handler", new HttpFileServerHandler());
- return pipeline;
- }
-}
\ No newline at end of file