You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/03 14:11:22 UTC
[1/3] tajo git commit: TAJO-527: Upgrade to Netty 4
Repository: tajo
Updated Branches:
refs/heads/master 64e47a401 -> 22876a825
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
index 0727f71..ed6b634 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -19,73 +19,125 @@
package org.apache.tajo.rpc;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.*;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import java.util.concurrent.Executors;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public final class RpcChannelFactory {
private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
-
+
private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
- private static ClientSocketChannelFactory factory;
- private static AtomicInteger clientCount = new AtomicInteger(0);
+ private static final Object lockObjectForLoopGroup = new Object();
private static AtomicInteger serverCount = new AtomicInteger(0);
+ public enum ClientChannelId {
+ CLIENT_DEFAULT,
+ FETCHER
+ }
+
+ private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
+ new ConcurrentHashMap<ClientChannelId, Integer>();
+ private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
+ new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
+
private RpcChannelFactory(){
}
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
+
+ defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
+ defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
+ }
/**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
- */
- public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory() {
- return getSharedClientChannelFactory(DEFAULT_WORKER_NUM);
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ */
+ public static EventLoopGroup getSharedClientEventloopGroup() {
+ return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
+ }
+
+ /**
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ *
+ * @param workerNum The number of workers
+ */
+ public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
+ //shared woker and boss pool
+ return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
}
/**
- * make this factory static thus all clients can share its thread pool.
- * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
*
- * @param workerNum The number of workers
+ * @param clientId
+ * @param workerNum
+ * @return
*/
- public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(int workerNum){
- //shared woker and boss pool
- if(factory == null){
- factory = createClientChannelFactory("Internal-Client", workerNum);
+ public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
+ Queue<EventLoopGroup> eventLoopGroupQueue;
+ EventLoopGroup returnEventLoopGroup;
+
+ synchronized (lockObjectForLoopGroup) {
+ eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
+ if (eventLoopGroupQueue == null) {
+ eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
+ }
+
+ returnEventLoopGroup = eventLoopGroupQueue.poll();
+ if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
+ returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
+ }
+ eventLoopGroupQueue.add(returnEventLoopGroup);
}
- return factory;
+
+ return returnEventLoopGroup;
+ }
+
+ protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
+ return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
}
// Client must release the external resources
- public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) {
- name = name + "-" + clientCount.incrementAndGet();
- if(LOG.isDebugEnabled()){
- LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum);
+ protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
+ int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
+ Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
+ eventLoopGroupPool.put(clientId, loopGroupQueue);
+
+ for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
+ loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
}
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build();
- ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build();
+ return loopGroupQueue;
+ }
- NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1,
- new HashedWheelTimer(), ThreadNameDeterminer.CURRENT);
- NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum,
- ThreadNameDeterminer.CURRENT);
+ protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
+ }
+
+ ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+ ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
- return new NioClientSocketChannelFactory(bossPool, workerPool);
+ return new NioEventLoopGroup(workerNum, clientFactory);
}
// Client must release the external resources
- public static synchronized ServerSocketChannelFactory createServerChannelFactory(String name, int workerNum) {
+ public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
name = name + "-" + serverCount.incrementAndGet();
if(LOG.isInfoEnabled()){
LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
@@ -93,22 +145,38 @@ public final class RpcChannelFactory {
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
-
- NioServerBossPool bossPool =
- new NioServerBossPool(Executors.newCachedThreadPool(bossFactory), 1, ThreadNameDeterminer.CURRENT);
- NioWorkerPool workerPool =
- new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, ThreadNameDeterminer.CURRENT);
-
- return new NioServerSocketChannelFactory(bossPool, workerPool);
+
+ EventLoopGroup bossGroup =
+ new NioEventLoopGroup(1, bossFactory);
+ EventLoopGroup workerGroup =
+ new NioEventLoopGroup(workerNum, workerFactory);
+
+ return new ServerBootstrap().group(bossGroup, workerGroup);
}
- public static synchronized void shutdown(){
+ public static void shutdownGracefully(){
if(LOG.isDebugEnabled()) {
LOG.debug("Shutdown Shared RPC Pool");
}
- if (factory != null) {
- factory.releaseExternalResources();
+
+ synchronized(lockObjectForLoopGroup) {
+ for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
+ for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
+ eventLoopGroup.shutdownGracefully();
+ }
+
+ eventLoopGroupQueue.clear();
+ }
+ eventLoopGroupPool.clear();
+ }
+ }
+
+ static class CleanUpHandler extends Thread {
+
+ @Override
+ public void run() {
+ RpcChannelFactory.shutdownGracefully();
}
- factory = null;
+
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index c8e622b..4ad9771 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -21,79 +21,71 @@ package org.apache.tajo.rpc;
import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.logging.CommonsLoggerFactory;
-import org.jboss.netty.logging.InternalLoggerFactory;
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import io.netty.util.internal.logging.CommonsLoggerFactory;
+import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
public class RpcConnectionPool {
private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
- private ConcurrentMap<RpcConnectionKey, NettyClientBase> connections =
- new ConcurrentHashMap<RpcConnectionKey, NettyClientBase>();
- private ChannelGroup accepted = new DefaultChannelGroup();
+ private Map<RpcConnectionKey, NettyClientBase> connections =
+ new HashMap<RpcConnectionKey, NettyClientBase>();
+ private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private static RpcConnectionPool instance;
- private final ClientSocketChannelFactory channelFactory;
+ private final Object lockObject = new Object();
public final static int RPC_RETRIES = 3;
- private RpcConnectionPool(ClientSocketChannelFactory channelFactory) {
- this.channelFactory = channelFactory;
+ private RpcConnectionPool() {
}
public synchronized static RpcConnectionPool getPool() {
if(instance == null) {
InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory());
- instance = new RpcConnectionPool(RpcChannelFactory.getSharedClientChannelFactory());
+ instance = new RpcConnectionPool();
}
return instance;
}
- public synchronized static RpcConnectionPool newPool(String poolName, int workerNum) {
- return new RpcConnectionPool(RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
- }
-
private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey)
throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
NettyClientBase client;
if(rpcConnectionKey.asyncMode) {
- client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
+ client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr,
+ RPC_RETRIES);
} else {
- client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES);
+ client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr,
+ RPC_RETRIES);
}
accepted.add(client.getChannel());
return client;
}
public NettyClientBase getConnection(InetSocketAddress addr,
- Class protocolClass, boolean asyncMode)
+ Class<?> protocolClass, boolean asyncMode)
throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException {
RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
NettyClientBase client = connections.get(key);
if (client == null) {
- boolean added;
- synchronized (connections){
- client = makeConnection(key);
- connections.put(key, client);
- added = true;
- }
-
- if (!added) {
- client.close();
+ synchronized (lockObject){
client = connections.get(key);
+ if (client == null) {
+ client = makeConnection(key);
+ connections.put(key, client);
+ }
}
}
- if (!client.getChannel().isOpen() || !client.getChannel().isConnected()) {
+ if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) {
LOG.warn("Try to reconnect : " + addr);
client.connect(addr);
}
@@ -104,9 +96,11 @@ public class RpcConnectionPool {
if (client == null) return;
try {
- if (!client.getChannel().isOpen()) {
- connections.remove(client.getKey());
- client.close();
+ synchronized (lockObject) {
+ if (!client.getChannel().isOpen()) {
+ connections.remove(client.getKey());
+ client.close();
+ }
}
if(LOG.isDebugEnabled()) {
@@ -128,8 +122,10 @@ public class RpcConnectionPool {
LOG.debug("Close connection [" + client.getKey() + "]");
}
- connections.remove(client.getKey());
- client.close();
+ synchronized (lockObject) {
+ connections.remove(client.getKey());
+ client.close();
+ }
} catch (Exception e) {
LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
@@ -140,7 +136,7 @@ public class RpcConnectionPool {
if(LOG.isDebugEnabled()) {
LOG.debug("Pool Closed");
}
- synchronized(connections) {
+ synchronized(lockObject) {
for(NettyClientBase eachClient: connections.values()) {
try {
eachClient.close();
@@ -148,11 +144,12 @@ public class RpcConnectionPool {
LOG.error("close client pool error", e);
}
}
+
+ connections.clear();
}
- connections.clear();
try {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ accepted.close();
} catch (Throwable t) {
LOG.error(t);
}
@@ -160,18 +157,16 @@ public class RpcConnectionPool {
public synchronized void shutdown(){
close();
- if(channelFactory != null){
- channelFactory.releaseExternalResources();
- }
+ RpcChannelFactory.shutdownGracefully();
}
static class RpcConnectionKey {
final InetSocketAddress addr;
- final Class protocolClass;
+ final Class<?> protocolClass;
final boolean asyncMode;
public RpcConnectionKey(InetSocketAddress addr,
- Class protocolClass, boolean asyncMode) {
+ Class<?> protocolClass, boolean asyncMode) {
this.addr = addr;
this.protocolClass = protocolClass;
this.asyncMode = asyncMode;
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
index 140f781..fb1cec2 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java
@@ -18,30 +18,30 @@
package org.apache.tajo.rpc;
-import com.google.protobuf.ServiceException;
-
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import com.google.protobuf.ServiceException;
+
public abstract class ServerCallable<T> {
protected InetSocketAddress addr;
protected long startTime;
protected long endTime;
- protected Class protocol;
+ protected Class<?> protocol;
protected boolean asyncMode;
protected boolean closeConn;
protected RpcConnectionPool connPool;
public abstract T call(NettyClientBase client) throws Exception;
- public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol, boolean asyncMode) {
+ public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) {
this(connPool, addr, protocol, asyncMode, false);
}
- public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol,
+ public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol,
boolean asyncMode, boolean closeConn) {
this.connPool = connPool;
this.addr = addr;
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 61a92bc..31d5265 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -27,13 +27,21 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+import io.netty.channel.ConnectTimeoutException;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -47,43 +55,102 @@ public class TestAsyncRpc {
double sum;
String echo;
- static AsyncRpcServer server;
- static AsyncRpcClient client;
- static Interface stub;
- static DummyProtocolAsyncImpl service;
- ClientSocketChannelFactory clientChannelFactory;
+ AsyncRpcServer server;
+ AsyncRpcClient client;
+ Interface stub;
+ DummyProtocolAsyncImpl service;
int retries;
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @interface SetupRpcConnection {
+ boolean setupRpcServer() default true;
+ boolean setupRpcClient() default true;
+ }
+
+ @Rule
+ public ExternalResource resource = new ExternalResource() {
+
+ private Description description;
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ this.description = description;
+ return super.apply(base, description);
+ }
- @Before
- public void setUp() throws Exception {
- retries = 1;
+ @Override
+ protected void before() throws Throwable {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ setUpRpcServer();
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ setUpRpcClient();
+ }
+ }
+
+ @Override
+ protected void after() {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory("TestAsyncRpc", 2);
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ try {
+ tearDownRpcClient();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ try {
+ tearDownRpcServer();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ };
+
+ public void setUpRpcServer() throws Exception {
service = new DummyProtocolAsyncImpl();
server = new AsyncRpcServer(DummyProtocol.class,
service, new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
+ }
+
+ public void setUpRpcClient() throws Exception {
+ retries = 1;
+
client = new AsyncRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+ RpcUtils.getConnectAddress(server.getListenAddress()), retries);
stub = client.getStub();
}
- @After
- public void tearDown() throws Exception {
- if(client != null) {
- client.close();
- }
-
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ RpcChannelFactory.shutdownGracefully();
+ }
+
+ public void tearDownRpcServer() throws Exception {
if(server != null) {
server.shutdown();
+ server = null;
}
-
- if (clientChannelFactory != null) {
- clientChannelFactory.releaseExternalResources();
+ }
+
+ public void tearDownRpcClient() throws Exception {
+ if(client != null) {
+ client.close();
+ client = null;
}
}
boolean calledMarker = false;
+
@Test
public void testRpc() throws Exception {
@@ -130,7 +197,7 @@ public class TestAsyncRpc {
testNullLatch.countDown();
}
});
- testNullLatch.await(1000, TimeUnit.MILLISECONDS);
+ assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS));
assertTrue(service.getNullCalled);
}
@@ -169,8 +236,7 @@ public class TestAsyncRpc {
.setMessage(MESSAGE).build();
CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
- server.shutdown();
- server = null;
+ tearDownRpcServer();
stub.echo(future.getController(), echoMessage, future);
EchoMessage response = future.get();
@@ -187,8 +253,10 @@ public class TestAsyncRpc {
.setMessage(MESSAGE).build();
CallFuture<EchoMessage> future = new CallFuture<EchoMessage>();
- server.shutdown();
- server = null;
+ if (server != null) {
+ server.shutdown(true);
+ server = null;
+ }
stub = client.getStub();
stub.echo(future.getController(), echoMessage, future);
@@ -200,10 +268,13 @@ public class TestAsyncRpc {
}
@Test
+ @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
public void testConnectionRetry() throws Exception {
retries = 10;
- final InetSocketAddress address = server.getListenAddress();
- tearDown();
+ ServerSocket serverSocket = new ServerSocket(0);
+ final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+ serverSocket.close();
+ service = new DummyProtocolAsyncImpl();
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
@@ -214,7 +285,7 @@ public class TestAsyncRpc {
@Override
public void run() {
try {
- Thread.sleep(100);
+ Thread.sleep(1000);
server = new AsyncRpcServer(DummyProtocol.class,
service, address, 2);
} catch (Exception e) {
@@ -225,8 +296,7 @@ public class TestAsyncRpc {
});
serverThread.start();
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
- client = new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ client = new AsyncRpcClient(DummyProtocol.class, address, retries);
stub = client.getStub();
stub.echo(future.getController(), echoMessage, future);
@@ -240,7 +310,7 @@ public class TestAsyncRpc {
InetSocketAddress address = new InetSocketAddress("test", 0);
boolean expected = false;
try {
- new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ new AsyncRpcClient(DummyProtocol.class, address, retries);
fail();
} catch (ConnectTimeoutException e) {
expected = true;
@@ -251,13 +321,11 @@ public class TestAsyncRpc {
}
@Test
+ @SetupRpcConnection(setupRpcClient=false)
public void testUnresolvedAddress() throws Exception {
- client.close();
- client = null;
-
String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
client = new AsyncRpcClient(DummyProtocol.class,
- RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+ RpcUtils.createUnresolved(hostAndPort), retries);
Interface stub = client.getStub();
EchoMessage echoMessage = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 746bfcb..07e2dca 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -24,13 +24,20 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.Rule;
import org.junit.Test;
-
+import org.junit.rules.ExternalResource;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
import java.net.ConnectException;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -44,35 +51,92 @@ public class TestBlockingRpc {
private BlockingInterface stub;
private DummyProtocolBlockingImpl service;
private int retries;
- private ClientSocketChannelFactory clientChannelFactory;
-
- @Before
- public void setUp() throws Exception {
- retries = 1;
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @interface SetupRpcConnection {
+ boolean setupRpcServer() default true;
+ boolean setupRpcClient() default true;
+ }
+
+ @Rule
+ public ExternalResource resource = new ExternalResource() {
+
+ private Description description;
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ this.description = description;
+ return super.apply(base, description);
+ }
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
+ @Override
+ protected void before() throws Throwable {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ setUpRpcServer();
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ setUpRpcClient();
+ }
+ }
+ @Override
+ protected void after() {
+ SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class);
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) {
+ try {
+ tearDownRpcClient();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
+ if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) {
+ try {
+ tearDownRpcServer();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+ }
+
+ };
+
+ public void setUpRpcServer() throws Exception {
service = new DummyProtocolBlockingImpl();
server = new BlockingRpcServer(DummyProtocol.class, service,
new InetSocketAddress("127.0.0.1", 0), 2);
server.start();
+ }
+
+ public void setUpRpcClient() throws Exception {
+ retries = 1;
+
client = new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries);
+ RpcUtils.getConnectAddress(server.getListenAddress()), retries);
stub = client.getStub();
}
- @After
- public void tearDown() throws Exception {
- if(client != null) {
- client.close();
- }
-
+ @AfterClass
+ public static void tearDownClass() throws Exception {
+ RpcChannelFactory.shutdownGracefully();
+ }
+
+ public void tearDownRpcServer() throws Exception {
if(server != null) {
server.shutdown();
+ server = null;
}
-
- if(clientChannelFactory != null){
- clientChannelFactory.releaseExternalResources();
+ }
+
+ public void tearDownRpcClient() throws Exception {
+ if(client != null) {
+ client.close();
+ client = null;
}
}
@@ -93,8 +157,9 @@ public class TestBlockingRpc {
}
@Test
+ @SetupRpcConnection(setupRpcClient=false)
public void testRpcWithServiceCallable() throws Exception {
- RpcConnectionPool pool = RpcConnectionPool.newPool(getClass().getSimpleName(), 2);
+ RpcConnectionPool pool = RpcConnectionPool.getPool();
final SumRequest request = SumRequest.newBuilder()
.setX1(1)
.setX2(2)
@@ -148,10 +213,12 @@ public class TestBlockingRpc {
}
@Test
+ @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false)
public void testConnectionRetry() throws Exception {
retries = 10;
- final InetSocketAddress address = server.getListenAddress();
- tearDown();
+ ServerSocket serverSocket = new ServerSocket(0);
+ final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort());
+ serverSocket.close();
EchoMessage message = EchoMessage.newBuilder()
.setMessage(MESSAGE).build();
@@ -161,8 +228,8 @@ public class TestBlockingRpc {
@Override
public void run() {
try {
- Thread.sleep(100);
- server = new BlockingRpcServer(DummyProtocol.class, service, address, 2);
+ Thread.sleep(1000);
+ server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2);
} catch (Exception e) {
fail(e.getMessage());
}
@@ -171,8 +238,7 @@ public class TestBlockingRpc {
});
serverThread.start();
- clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2);
- client = new BlockingRpcClient(DummyProtocol.class, address, clientChannelFactory, retries);
+ client = new BlockingRpcClient(DummyProtocol.class, address, retries);
stub = client.getStub();
EchoMessage response = stub.echo(null, message);
@@ -182,14 +248,20 @@ public class TestBlockingRpc {
@Test
public void testConnectionFailed() throws Exception {
boolean expected = false;
+ NettyClientBase client = null;
+
try {
int port = server.getListenAddress().getPort() + 1;
- new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries);
+ client = new BlockingRpcClient(DummyProtocol.class,
+ RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries);
+ client.close();
fail("Connection should be failed.");
} catch (ConnectException ce) {
expected = true;
} catch (Throwable ce){
+ if (client != null) {
+ client.close();
+ }
fail();
}
assertTrue(expected);
@@ -240,7 +312,7 @@ public class TestBlockingRpc {
};
shutdownThread.start();
- latch.await(5 * 1000, TimeUnit.MILLISECONDS);
+ assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS));
assertTrue(latch.getCount() == 0);
@@ -254,13 +326,11 @@ public class TestBlockingRpc {
}
@Test
+ @SetupRpcConnection(setupRpcClient=false)
public void testUnresolvedAddress() throws Exception {
- client.close();
- client = null;
-
String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress());
client = new BlockingRpcClient(DummyProtocol.class,
- RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries);
+ RpcUtils.createUnresolved(hostAndPort), retries);
BlockingInterface stub = client.getStub();
EchoMessage message = EchoMessage.newBuilder()
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
index 90499ce..0ca7563 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java
@@ -27,7 +27,6 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage;
import org.apache.tajo.rpc.test.TestProtos.SumRequest;
import org.apache.tajo.rpc.test.TestProtos.SumResponse;
-@SuppressWarnings("UnusedDeclaration")
public class DummyProtocolAsyncImpl implements Interface {
private static final Log LOG =
LogFactory.getLog(DummyProtocolAsyncImpl.class);
@@ -74,7 +73,7 @@ public class DummyProtocolAsyncImpl implements Interface {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage());
}
done.run(request);
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 5513aa6..957b4c1 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -168,6 +168,18 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-common</artifactId>
<scope>provided</scope>
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
index cf8a54e..389cd31 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java
@@ -21,13 +21,16 @@ package org.apache.tajo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
@@ -38,20 +41,20 @@ public class HttpFileServer {
private final InetSocketAddress addr;
private InetSocketAddress bindAddr;
private ServerBootstrap bootstrap = null;
- private ChannelFactory factory = null;
+ private EventLoopGroup eventloopGroup = null;
private ChannelGroup channelGroup = null;
public HttpFileServer(final InetSocketAddress addr) {
this.addr = addr;
- this.factory = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
- 2);
+ this.eventloopGroup = new NioEventLoopGroup(2, Executors.defaultThreadFactory());
// Configure the server.
- this.bootstrap = new ServerBootstrap(factory);
- // Set up the event pipeline factory.
- this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory());
- this.channelGroup = new DefaultChannelGroup();
+ this.bootstrap = new ServerBootstrap();
+ this.bootstrap.childHandler(new HttpFileServerChannelInitializer())
+ .group(eventloopGroup)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .channel(NioServerSocketChannel.class);
+ this.channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
public HttpFileServer(String bindaddr) {
@@ -60,9 +63,9 @@ public class HttpFileServer {
public void start() {
// Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(addr);
- channelGroup.add(channel);
- this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+ ChannelFuture future = bootstrap.bind(addr).syncUninterruptibly();
+ channelGroup.add(future.channel());
+ this.bindAddr = (InetSocketAddress) future.channel().localAddress();
LOG.info("HttpFileServer starts up ("
+ this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+ ")");
@@ -73,9 +76,8 @@ public class HttpFileServer {
}
public void stop() {
- ChannelGroupFuture future = channelGroup.close();
- future.awaitUninterruptibly();
- factory.releaseExternalResources();
+ channelGroup.close();
+ eventloopGroup.shutdownGracefully();
LOG.info("HttpFileServer shutdown ("
+ this.bindAddr.getAddress().getHostAddress() + ":"
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
new file mode 100644
index 0000000..f2a97b6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+public class HttpFileServerChannelInitializer extends ChannelInitializer<Channel> {
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+
+ // Uncomment the following lines if you want HTTPS
+ //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ //engine.setUseClientMode(false);
+ //pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
+ pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+
+ pipeline.addLast("handler", new HttpFileServerHandler());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
index 6c77317..78902f3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java
@@ -18,16 +18,13 @@
package org.apache.tajo;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
import java.io.File;
import java.io.FileNotFoundException;
@@ -35,39 +32,34 @@ import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
-/**
- * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6
- */
-public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
+public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+
+ private final Log LOG = LogFactory.getLog(HttpFileServerHandler.class);
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+
+ if (request.getMethod() != HttpMethod.GET) {
+ sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
final String path = sanitizeUri(request.getUri());
if (path == null) {
- sendError(ctx, FORBIDDEN);
+ sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
File file = new File(path);
if (file.isHidden() || !file.exists()) {
- sendError(ctx, NOT_FOUND);
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
if (!file.isFile()) {
- sendError(ctx, FORBIDDEN);
+ sendError(ctx, HttpResponseStatus.FORBIDDEN);
return;
}
@@ -75,62 +67,62 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
try {
raf = new RandomAccessFile(file, "r");
} catch (FileNotFoundException fnfe) {
- sendError(ctx, NOT_FOUND);
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
long fileLength = raf.length();
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
- setContentLength(response, fileLength);
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ HttpHeaders.setContentLength(response, fileLength);
setContentTypeHeader(response);
- Channel ch = e.getChannel();
-
// Write the initial line and the header.
- ch.write(response);
+ ctx.write(response);
// Write the content.
ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
+ ChannelFuture lastContentFuture;
+ if (ctx.pipeline().get(SslHandler.class) != null) {
// Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192));
+ lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)));
} else {
// No encryption - use zero-copy.
- final FileRegion region =
- new DefaultFileRegion(raf.getChannel(), 0, fileLength);
- writeFuture = ch.write(region);
- writeFuture.addListener(new ChannelFutureProgressListener() {
- public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
+ final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
+ writeFuture = ctx.write(region);
+ lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ writeFuture.addListener(new ChannelProgressiveFutureListener() {
+ @Override
+ public void operationProgressed(ChannelProgressiveFuture future, long progress, long total)
+ throws Exception {
+ LOG.trace(String.format("%s: %d / %d", path, progress, total));
}
- public void operationProgressed(
- ChannelFuture future, long amount, long current, long total) {
- System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount);
+ @Override
+ public void operationComplete(ChannelProgressiveFuture future) throws Exception {
+ region.release();
}
});
}
// Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
+ if (!HttpHeaders.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
- writeFuture.addListener(ChannelFutureListener.CLOSE);
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
+ Channel ch = ctx.channel();
if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
+ sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
+ LOG.error(cause.getMessage(), cause);
+ if (ch.isActive()) {
+ sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
@@ -161,14 +153,13 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n",
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+ Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n",
CharsetUtil.UTF_8));
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
@@ -178,7 +169,7 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler {
* HTTP response
*/
private static void setContentTypeHeader(HttpResponse response) {
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
deleted file mode 100644
index cecf93b..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-// Uncomment the following lines if you want HTTPS
-//import javax.net.ssl.SSLEngine;
-//import org.jboss.netty.example.securechat.SecureChatSslContextFactory;
-//import org.jboss.netty.handler.ssl.SslHandler;
-
-//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6
-public class HttpFileServerPipelineFactory implements ChannelPipelineFactory {
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following lines if you want HTTPS
- //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine();
- //engine.setUseClientMode(false);
- //pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-
- pipeline.addLast("handler", new HttpFileServerHandler());
- return pipeline;
- }
-}
\ No newline at end of file
[3/3] tajo git commit: TAJO-527: Upgrade to Netty 4
Posted by ji...@apache.org.
TAJO-527: Upgrade to Netty 4
Closes #311
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/22876a82
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/22876a82
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/22876a82
Branch: refs/heads/master
Commit: 22876a825e9d19b0f599c342d4ae3902d85f2c4d
Parents: 64e47a4
Author: Jihun Kang <ji...@apache.org>
Authored: Tue Mar 3 22:10:21 2015 +0900
Committer: Jihun Kang <ji...@apache.org>
Committed: Tue Mar 3 22:10:21 2015 +0900
----------------------------------------------------------------------
CHANGES | 1 +
.../org/apache/tajo/client/QueryClientImpl.java | 3 +-
.../apache/tajo/client/SessionConnection.java | 15 +-
tajo-core/pom.xml | 4 +
.../java/org/apache/tajo/master/TajoMaster.java | 2 +-
.../tajo/worker/ExecutionBlockContext.java | 37 +--
.../java/org/apache/tajo/worker/Fetcher.java | 198 +++++++-------
.../java/org/apache/tajo/worker/TajoWorker.java | 2 +-
.../main/java/org/apache/tajo/worker/Task.java | 12 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 4 +-
.../apache/tajo/worker/TaskRunnerManager.java | 13 +-
.../apache/tajo/master/TestRepartitioner.java | 5 +-
.../org/apache/tajo/worker/TestFetcher.java | 25 +-
tajo-project/pom.xml | 24 +-
tajo-pullserver/pom.xml | 8 +
.../tajo/pullserver/FadvisedChunkedFile.java | 17 +-
.../tajo/pullserver/FadvisedFileRegion.java | 16 +-
.../tajo/pullserver/FileCloseListener.java | 8 +-
.../HttpDataServerChannelInitializer.java | 58 +++++
.../tajo/pullserver/HttpDataServerHandler.java | 137 +++++-----
.../HttpDataServerPipelineFactory.java | 56 ----
.../tajo/pullserver/PullServerAuxService.java | 229 ++++++++--------
.../tajo/pullserver/TajoPullServerService.java | 259 ++++++++++---------
.../retriever/AdvancedDataRetriever.java | 10 +-
.../pullserver/retriever/DataRetriever.java | 4 +-
.../retriever/DirectoryRetriever.java | 5 +-
tajo-rpc/pom.xml | 10 +-
.../org/apache/tajo/rpc/AsyncRpcClient.java | 106 +++++---
.../org/apache/tajo/rpc/AsyncRpcServer.java | 126 ++++-----
.../org/apache/tajo/rpc/BlockingRpcClient.java | 122 +++++----
.../org/apache/tajo/rpc/BlockingRpcServer.java | 125 +++++----
.../java/org/apache/tajo/rpc/CallFuture.java | 8 +-
.../apache/tajo/rpc/DefaultRpcController.java | 7 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 133 ++++++----
.../org/apache/tajo/rpc/NettyServerBase.java | 82 +++---
.../java/org/apache/tajo/rpc/NullCallback.java | 2 +-
.../tajo/rpc/ProtoChannelInitializer.java | 50 ++++
.../apache/tajo/rpc/ProtoPipelineFactory.java | 50 ----
.../org/apache/tajo/rpc/RpcChannelFactory.java | 160 ++++++++----
.../org/apache/tajo/rpc/RpcConnectionPool.java | 87 +++----
.../org/apache/tajo/rpc/ServerCallable.java | 10 +-
.../java/org/apache/tajo/rpc/TestAsyncRpc.java | 144 ++++++++---
.../org/apache/tajo/rpc/TestBlockingRpc.java | 138 +++++++---
.../rpc/test/impl/DummyProtocolAsyncImpl.java | 3 +-
tajo-storage/tajo-storage-hdfs/pom.xml | 12 +
.../java/org/apache/tajo/HttpFileServer.java | 44 ++--
.../tajo/HttpFileServerChannelInitializer.java | 47 ++++
.../org/apache/tajo/HttpFileServerHandler.java | 109 ++++----
.../tajo/HttpFileServerPipelineFactory.java | 54 ----
49 files changed, 1552 insertions(+), 1229 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e8c8b18..668c0db 100644
--- a/CHANGES
+++ b/CHANGES
@@ -7,6 +7,7 @@ Release 0.11.0 - unreleased
IMPROVEMENT
+ TAJO-527: Upgrade to Netty 4. (jihun)
BUG FIXES
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index bc89679..fae613a 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -19,6 +19,7 @@
package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.*;
@@ -32,6 +33,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.jdbc.FetchResultSet;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.util.ProtoUtil;
@@ -83,7 +85,6 @@ public class QueryClientImpl implements QueryClient {
@Override
public void close() {
-
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index f8762da..bcf6d8b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -34,7 +34,8 @@ import org.apache.tajo.rpc.ServerCallable;
import org.apache.tajo.service.ServiceTracker;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.ProtoUtil;
-import org.jboss.netty.channel.ConnectTimeoutException;
+
+import io.netty.channel.ConnectTimeoutException;
import java.io.Closeable;
import java.io.IOException;
@@ -84,11 +85,7 @@ public class SessionConnection implements Closeable {
this.properties = properties;
- //TODO separate ConfVars from TajoConf
- int workerNum = this.properties.getInt("tajo.rpc.client.worker-thread-num", 4);
-
- // Don't share connection pool per client
- connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum);
+ connPool = RpcConnectionPool.getPool();
userInfo = UserRoleInfo.getCurrentUser();
this.baseDatabase = baseDatabase != null ? baseDatabase : null;
@@ -130,7 +127,7 @@ public class SessionConnection implements Closeable {
if(!closed.get()){
try {
return connPool.getConnection(serviceTracker.getClientServiceAddress(),
- TajoMasterClientProtocol.class, false).isConnected();
+ TajoMasterClientProtocol.class, false).isActive();
} catch (Throwable e) {
return false;
}
@@ -288,10 +285,6 @@ public class SessionConnection implements Closeable {
} catch (Throwable e) {
}
-
- if(connPool != null) {
- connPool.shutdown();
- }
}
protected InetSocketAddress getTajoMasterAddr() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index ce9db73..d3c7ed6 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -388,6 +388,10 @@
<version>3.1.1</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
<version>6.1.14</version>
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 586abb0..6f7c5a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -559,7 +559,7 @@ public class TajoMaster extends CompositeService {
LOG.info("TajoMaster received SIGINT Signal");
LOG.info("============================================");
stop();
- RpcChannelFactory.shutdown();
+ RpcChannelFactory.shutdownGracefully();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 8cf94eb..813c502 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -42,9 +42,10 @@ import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
-import org.jboss.netty.channel.ConnectTimeoutException;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.Timer;
+import org.apache.tajo.worker.event.TaskRunnerStartEvent;
+
+import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -67,7 +68,7 @@ public class ExecutionBlockContext {
public AtomicInteger killedTasksNum = new AtomicInteger();
public AtomicInteger failedTasksNum = new AtomicInteger();
- private ClientSocketChannelFactory channelFactory;
+ private EventLoopGroup loopGroup;
// for temporal or intermediate files
private FileSystem localFS;
// for input files
@@ -184,12 +185,6 @@ public class ExecutionBlockContext {
tasks.clear();
resource.release();
-
- try {
- releaseShuffleChannelFactory();
- } catch (Throwable e) {
- LOG.error(e.getMessage(), e);
- }
}
public TajoConf getConf() {
@@ -267,30 +262,10 @@ public class ExecutionBlockContext {
return histories.get(runner.getId());
}
- public TajoWorker.WorkerContext getWorkerContext() {
+ public TajoWorker.WorkerContext getWorkerContext(){
return workerContext;
}
- protected ClientSocketChannelFactory getShuffleChannelFactory(){
- if(channelFactory == null) {
- int workerNum = getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM);
- channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
- }
- return channelFactory;
- }
-
- public Timer getRPCTimer() {
- return manager.getRPCTimer();
- }
-
- protected void releaseShuffleChannelFactory(){
- if(channelFactory != null) {
- channelFactory.shutdown();
- channelFactory.releaseExternalResources();
- channelFactory = null;
- }
- }
-
private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception {
getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 742a025..fc57a96 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -18,20 +18,33 @@
package org.apache.tajo.worker;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.timeout.ReadTimeoutException;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.jboss.netty.util.Timer;
+import org.apache.tajo.rpc.RpcChannelFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultHttpRequest;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpContentDecompressor;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
import java.io.File;
import java.io.FileNotFoundException;
@@ -40,8 +53,7 @@ import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.FileChannel;
-
-import static org.jboss.netty.channel.Channels.pipeline;
+import java.util.concurrent.TimeUnit;
/**
* Fetcher fetches data from a given uri via HTTP protocol and stores them into
@@ -64,17 +76,15 @@ public class Fetcher {
private long fileLen;
private int messageReceiveCount;
private TajoProtos.FetcherState state;
- private Timer timer;
- private ClientBootstrap bootstrap;
+ private Bootstrap bootstrap;
- public Fetcher(TajoConf conf, URI uri, FileChunk chunk, ClientSocketChannelFactory factory, Timer timer) {
+ public Fetcher(TajoConf conf, URI uri, FileChunk chunk) {
this.uri = uri;
this.fileChunk = chunk;
this.useLocalFile = !chunk.fromRemote();
this.state = TajoProtos.FetcherState.FETCH_INIT;
this.conf = conf;
- this.timer = timer;
String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -88,13 +98,18 @@ public class Fetcher {
}
if (!useLocalFile) {
- bootstrap = new ClientBootstrap(factory);
- bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
- bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
- bootstrap.setOption("tcpNoDelay", true);
-
- ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(fileChunk.getFile());
- bootstrap.setPipelineFactory(pipelineFactory);
+ bootstrap = new Bootstrap()
+ .group(
+ RpcChannelFactory.getSharedClientEventloopGroup(RpcChannelFactory.ClientChannelId.FETCHER,
+ conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // set 5 sec
+ .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M
+ .option(ChannelOption.TCP_NODELAY, true);
+
+ ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile());
+ bootstrap.handler(initializer);
}
}
@@ -132,30 +147,30 @@ public class Fetcher {
this.state = TajoProtos.FetcherState.FETCH_FETCHING;
ChannelFuture future = null;
try {
- future = bootstrap.connect(new InetSocketAddress(host, port));
+ future = bootstrap.clone().connect(new InetSocketAddress(host, port))
+ .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// Wait until the connection attempt succeeds or fails.
- Channel channel = future.awaitUninterruptibly().getChannel();
+ Channel channel = future.awaitUninterruptibly().channel();
if (!future.isSuccess()) {
- future.getChannel().close();
state = TajoProtos.FetcherState.FETCH_FAILED;
- throw new IOException(future.getCause());
+ throw new IOException(future.cause());
}
String query = uri.getPath()
+ (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
// Prepare the HTTP request.
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query);
- request.setHeader(HttpHeaders.Names.HOST, host);
- request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
- request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
+ request.headers().set(HttpHeaders.Names.HOST, host);
+ request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
+ request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP);
LOG.info("Status: " + getState() + ", URI:" + uri);
// Send the HTTP request.
- ChannelFuture channelFuture = channel.write(request);
+ ChannelFuture channelFuture = channel.writeAndFlush(request);
// Wait for the server to close the connection.
- channel.getCloseFuture().awaitUninterruptibly();
+ channel.closeFuture().awaitUninterruptibly();
channelFuture.addListener(ChannelFutureListener.CLOSE);
@@ -164,7 +179,7 @@ public class Fetcher {
} finally {
if(future != null){
// Close the channel to exit.
- future.getChannel().close();
+ future.channel().close();
}
this.finishTime = System.currentTimeMillis();
@@ -176,8 +191,7 @@ public class Fetcher {
return this.uri;
}
- class HttpClientHandler extends SimpleChannelUpstreamHandler {
- private volatile boolean readingChunks;
+ class HttpClientHandler extends ChannelInboundHandlerAdapter {
private final File file;
private RandomAccessFile raf;
private FileChannel fc;
@@ -185,27 +199,27 @@ public class Fetcher {
public HttpClientHandler(File file) throws FileNotFoundException {
this.file = file;
+ this.raf = new RandomAccessFile(file, "rw");
+ this.fc = raf.getChannel();
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
messageReceiveCount++;
- try {
- if (!readingChunks && e.getMessage() instanceof HttpResponse) {
-
- HttpResponse response = (HttpResponse) e.getMessage();
+ if (msg instanceof HttpResponse) {
+ try {
+ HttpResponse response = (HttpResponse) msg;
StringBuilder sb = new StringBuilder();
if (LOG.isDebugEnabled()) {
- sb.append("STATUS: ").append(response.getStatus())
- .append(", VERSION: ").append(response.getProtocolVersion())
- .append(", HEADER: ");
+ sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ")
+ .append(response.getProtocolVersion()).append(", HEADER: ");
}
- if (!response.getHeaderNames().isEmpty()) {
- for (String name : response.getHeaderNames()) {
- for (String value : response.getHeaders(name)) {
+ if (!response.headers().names().isEmpty()) {
+ for (String name : response.headers().names()) {
+ for (String value : response.headers().getAll(name)) {
if (LOG.isDebugEnabled()) {
sb.append(name).append(" = ").append(value);
}
@@ -219,109 +233,99 @@ public class Fetcher {
LOG.debug(sb.toString());
}
- if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) {
+ if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) {
LOG.warn("There are no data corresponding to the request");
length = 0;
return;
- } else if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()){
- LOG.error(response.getStatus().getReasonPhrase());
+ } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) {
+ LOG.error(response.getStatus().reasonPhrase());
state = TajoProtos.FetcherState.FETCH_FAILED;
return;
}
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
- this.raf = new RandomAccessFile(file, "rw");
- this.fc = raf.getChannel();
+ if (msg instanceof HttpContent) {
+ try {
+ HttpContent httpContent = (HttpContent) msg;
+ ByteBuf content = httpContent.content();
+ if (content.isReadable()) {
+ content.readBytes(fc, content.readableBytes());
+ }
- if (response.isChunked()) {
- readingChunks = true;
- } else {
- ChannelBuffer content = response.getContent();
- if (content.readable()) {
- fc.write(content.toByteBuffer());
+ if (msg instanceof LastHttpContent) {
+ if (raf != null) {
+ fileLen = file.length();
}
- }
- } else {
- HttpChunk chunk = (HttpChunk) e.getMessage();
- if (chunk.isLast()) {
- readingChunks = false;
- long fileLength = file.length();
- if (fileLength == length) {
- LOG.info("Data fetch is done (total received bytes: " + fileLength
- + ")");
- } else {
- LOG.info("Data fetch is done, but cannot get all data "
- + "(received/total: " + fileLength + "/" + length + ")");
+
+ IOUtils.cleanup(LOG, fc, raf);
+ if (ctx.channel().isActive()) {
+ ctx.channel().close();
}
- } else {
- if(fc != null){
- fc.write(chunk.getContent().toByteBuffer());
+ finishTime = System.currentTimeMillis();
+ if (state != TajoProtos.FetcherState.FETCH_FAILED) {
+ state = TajoProtos.FetcherState.FETCH_FINISHED;
}
}
- }
- } finally {
- if(raf != null) {
- fileLen = file.length();
- }
-
- if(fileLen == length){
- IOUtils.cleanup(LOG, fc, raf);
- finishTime = System.currentTimeMillis();
- state = TajoProtos.FetcherState.FETCH_FINISHED;
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- if (e.getCause() instanceof ReadTimeoutException) {
- LOG.warn(e.getCause());
+ if (cause instanceof ReadTimeoutException) {
+ LOG.warn(cause);
} else {
- LOG.error("Fetch failed :", e.getCause());
+ LOG.error("Fetch failed :", cause);
}
// this fetching will be retry
IOUtils.cleanup(LOG, fc, raf);
- if(ctx.getChannel().isConnected()){
- ctx.getChannel().close();
- }
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FAILED;
+ ctx.close();
}
@Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- super.channelDisconnected(ctx, e);
-
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){
//channel is closed, but cannot complete fetcher
finishTime = System.currentTimeMillis();
state = TajoProtos.FetcherState.FETCH_FAILED;
}
IOUtils.cleanup(LOG, fc, raf);
+
+ super.channelUnregistered(ctx);
}
}
- class HttpClientPipelineFactory implements
- ChannelPipelineFactory {
+ class HttpClientChannelInitializer extends ChannelInitializer<Channel> {
private final File file;
- public HttpClientPipelineFactory(File file) {
+ public HttpClientChannelInitializer(File file) {
this.file = file;
}
@Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = pipeline();
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
pipeline.addLast("inflater", new HttpContentDecompressor());
- pipeline.addLast("timeout", new ReadTimeoutHandler(timer, readTimeout));
+ pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS));
pipeline.addLast("handler", new HttpClientHandler(file));
- return pipeline;
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 7e2a233..3c55add 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -604,7 +604,7 @@ public class TajoWorker extends CompositeService {
LOG.info("TajoWorker received SIGINT Signal");
LOG.info("============================================");
stop();
- RpcChannelFactory.shutdown();
+ RpcChannelFactory.shutdownGracefully();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index df3be12..ef94337 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import io.netty.channel.EventLoopGroup;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,9 +56,8 @@ import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.util.Timer;
+
+import io.netty.handler.codec.http.QueryStringDecoder;
import java.io.File;
import java.io.IOException;
@@ -664,8 +664,6 @@ public class Task {
List<FetchImpl> fetches) throws IOException {
if (fetches.size() > 0) {
- ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory();
- Timer timer = executionBlockContext.getRPCTimer();
Path inputDir = executionBlockContext.getLocalDirAllocator().
getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
@@ -716,7 +714,7 @@ public class Task {
// If we decide that intermediate data should be really fetched from a remote host, storeChunk
// represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
storeChunk.setEbId(f.getName());
- Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer);
+ Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
runnerList.add(fetcher);
i++;
@@ -732,7 +730,7 @@ public class Task {
private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
// Parse the URI
LOG.info("getLocalStoredFileChunk starts");
- final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters();
+ final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
final List<String> types = params.get("type");
final List<String> qids = params.get("qid");
final List<String> taskIdList = params.get("ta");
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index cf50767..2cdebc8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -19,6 +19,7 @@
package org.apache.tajo.worker;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,7 +36,8 @@ import org.apache.tajo.master.container.TajoContainerIdPBImpl;
import org.apache.tajo.master.container.TajoConverterUtils;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NullCallback;
-import org.jboss.netty.channel.ConnectTimeoutException;
+
+import io.netty.channel.ConnectTimeoutException;
import java.util.concurrent.*;
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 570bd38..3f4a1b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -20,6 +20,7 @@ package org.apache.tajo.worker;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -33,8 +34,6 @@ import org.apache.tajo.engine.utils.TupleCache;
import org.apache.tajo.worker.event.TaskRunnerEvent;
import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import org.apache.tajo.worker.event.TaskRunnerStopEvent;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
import java.io.IOException;
import java.util.*;
@@ -52,7 +51,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
private AtomicBoolean stop = new AtomicBoolean(false);
private FinishedTaskCleanThread finishedTaskCleanThread;
private Dispatcher dispatcher;
- private HashedWheelTimer rpcTimer;
public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) {
super(TaskRunnerManager.class.getName());
@@ -77,7 +75,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
public void start() {
finishedTaskCleanThread = new FinishedTaskCleanThread();
finishedTaskCleanThread.start();
- rpcTimer = new HashedWheelTimer();
super.start();
}
@@ -102,10 +99,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
finishedTaskCleanThread.interrupted();
}
- if(rpcTimer != null){
- rpcTimer.stop();
- }
-
super.stop();
if(workerContext.isYarnContainerMode()) {
workerContext.stopWorker(true);
@@ -214,10 +207,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
return tajoConf;
}
- public Timer getRPCTimer(){
- return rpcTimer;
- }
-
class FinishedTaskCleanThread extends Thread {
//TODO if history size is large, the historyMap should remove immediately
public void run() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 438867e..9910d79 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -31,9 +31,10 @@ import org.apache.tajo.querymaster.Repartitioner;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.FetchImpl;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.junit.Test;
+import io.netty.handler.codec.http.QueryStringDecoder;
+
import java.net.URI;
import java.util.*;
@@ -89,7 +90,7 @@ public class TestRepartitioner {
URI uri = uris.get(0);
final Map<String, List<String>> params =
- new QueryStringDecoder(uri).getParameters();
+ new QueryStringDecoder(uri).parameters();
assertEquals(eachEntry.getKey().toString(), params.get("p").get(0));
assertEquals("h", params.get("type").get(0));
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index b3654f9..513eb69 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -27,15 +27,9 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.TajoPullServerService;
import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.util.CommonTestingUtil;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.*;
import java.io.File;
import java.io.IOException;
@@ -50,8 +44,6 @@ public class TestFetcher {
private String OUTPUT_DIR = TEST_DATA+"/out/";
private TajoConf conf = new TajoConf();
private TajoPullServerService pullServerService;
- private ClientSocketChannelFactory channelFactory;
- private Timer timer;
@Before
public void setUp() throws Exception {
@@ -65,16 +57,11 @@ public class TestFetcher {
pullServerService = new TajoPullServerService();
pullServerService.init(conf);
pullServerService.start();
-
- channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
- timer = new HashedWheelTimer();
}
@After
public void tearDown(){
pullServerService.stop();
- channelFactory.releaseExternalResources();
- timer.stop();
}
@Test
@@ -102,7 +89,7 @@ public class TestFetcher {
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
FileChunk chunk = fetcher.get();
assertNotNull(chunk);
assertNotNull(chunk.getFile());
@@ -148,7 +135,7 @@ public class TestFetcher {
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -178,7 +165,7 @@ public class TestFetcher {
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -212,7 +199,7 @@ public class TestFetcher {
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
fetcher.get();
@@ -232,7 +219,7 @@ public class TestFetcher {
URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params);
FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0);
storeChunk.setFromRemote(true);
- final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer);
+ final Fetcher fetcher = new Fetcher(conf, uri, storeChunk);
assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
pullServerService.stop();
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 30f864c..3820d50 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -37,6 +37,7 @@
<protobuf.version>2.5.0</protobuf.version>
<tajo.version>0.10.0-SNAPSHOT</tajo.version>
<hbase.version>0.98.7-hadoop2</hbase.version>
+ <netty.version>4.0.25.Final</netty.version>
<tajo.root>${project.parent.relativePath}/..</tajo.root>
<extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
</properties>
@@ -1024,13 +1025,28 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.6.6.Final</version>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- <version>4.0.24.Final</version>
+ <artifactId>netty-transport</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index 6d13a3c..cdbda3e 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -47,6 +47,14 @@
<dependencies>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-rpc</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
index b0b8d18..3df82e6 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java
@@ -22,7 +22,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.handler.stream.ChunkedFile;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedFile;
import java.io.FileDescriptor;
import java.io.IOException;
@@ -52,13 +55,13 @@ public class FadvisedChunkedFile extends ChunkedFile {
}
@Override
- public Object nextChunk() throws Exception {
+ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
readaheadRequest = readaheadPool
- .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
- getEndOffset(), readaheadRequest);
+ .readaheadStream(identifier, fd, currentOffset(), readaheadLength,
+ endOffset(), readaheadRequest);
}
- return super.nextChunk();
+ return super.readChunk(ctx);
}
@Override
@@ -66,11 +69,11 @@ public class FadvisedChunkedFile extends ChunkedFile {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
- if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && endOffset() - startOffset() > 0) {
try {
PullServerUtil.posixFadviseIfPossible(identifier,
fd,
- getStartOffset(), getEndOffset() - getStartOffset(),
+ startOffset(), endOffset() - startOffset(),
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
index 18cf4b6..643d9e0 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java
@@ -19,11 +19,13 @@
package org.apache.tajo.pullserver;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.nativeio.NativeIO;
-import org.jboss.netty.channel.DefaultFileRegion;
+
+import io.netty.channel.DefaultFileRegion;
import java.io.FileDescriptor;
import java.io.IOException;
@@ -79,8 +81,8 @@ public class FadvisedFileRegion extends DefaultFileRegion {
throws IOException {
if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) {
readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
- getPosition() + position, readaheadLength,
- getPosition() + getCount(), readaheadRequest);
+ position() + position, readaheadLength,
+ position() + count(), readaheadRequest);
}
if(this.shuffleTransferToAllowed) {
@@ -146,11 +148,11 @@ public class FadvisedFileRegion extends DefaultFileRegion {
@Override
- public void releaseExternalResources() {
+ protected void deallocate() {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
- super.releaseExternalResources();
+ super.deallocate();
}
/**
@@ -158,9 +160,9 @@ public class FadvisedFileRegion extends DefaultFileRegion {
* we don't need the region to be cached anymore.
*/
public void transferSuccessful() {
- if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) {
+ if (PullServerUtil.isNativeIOPossible() && manageOsCache && count() > 0 && super.isOpen()) {
try {
- PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(),
+ PullServerUtil.posixFadviseIfPossible(identifier, fd, position(), count(),
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
index 236db89..9c3c523 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java
@@ -18,10 +18,10 @@
package org.apache.tajo.pullserver;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.GenericFutureListener;
-public class FileCloseListener implements ChannelFutureListener {
+public class FileCloseListener implements GenericFutureListener<ChannelFuture> {
private FadvisedFileRegion filePart;
private String requestUri;
@@ -45,7 +45,7 @@ public class FileCloseListener implements ChannelFutureListener {
if(future.isSuccess()){
filePart.transferSuccessful();
}
- filePart.releaseExternalResources();
+ filePart.deallocate();
if (pullServerService != null) {
pullServerService.completeFileChunk(filePart, requestUri, startTime);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java
new file mode 100644
index 0000000..8661ee5
--- /dev/null
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.pullserver;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.HttpContentCompressor;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+
+public class HttpDataServerChannelInitializer extends ChannelInitializer<Channel> {
+ private String userName;
+ private String appId;
+ public HttpDataServerChannelInitializer(String userName, String appId) {
+ this.userName = userName;
+ this.appId = appId;
+ }
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = channel.pipeline();
+
+ // Uncomment the following line if you want HTTPS
+ // SSLEngine engine =
+ // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ // engine.setUseClientMode(false);
+ // pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+ pipeline.addLast("deflater", new HttpContentCompressor());
+ pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
index bfb70b4..472b967 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java
@@ -19,19 +19,21 @@
package org.apache.tajo.pullserver;
import com.google.common.collect.Lists;
+
+import io.netty.channel.*;
+import io.netty.handler.codec.http.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.pullserver.retriever.DataRetriever;
import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
import java.io.*;
import java.net.URLDecoder;
@@ -41,14 +43,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
+public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class);
Map<ExecutionBlockId, DataRetriever> retrievers =
@@ -62,21 +57,18 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
+
+ if (request.getMethod() != HttpMethod.GET) {
+ sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
- String base =
- ContainerLocalizer.USERCACHE + "/" + userName + "/"
- + ContainerLocalizer.APPCACHE + "/"
- + appId + "/output" + "/";
+ String base = ContainerLocalizer.USERCACHE + "/" + userName + "/" + ContainerLocalizer.APPCACHE + "/" + appId
+ + "/output" + "/";
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
+ final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
List<FileChunk> chunks = Lists.newArrayList();
List<String> taskIds = splitMaps(params.get("ta"));
@@ -90,65 +82,54 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
}
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-// try {
-// file = retriever.handle(ctx, request);
-// } catch (FileNotFoundException fnf) {
-// LOG.error(fnf);
-// sendError(ctx, NOT_FOUND);
-// return;
-// } catch (IllegalArgumentException iae) {
-// LOG.error(iae);
-// sendError(ctx, BAD_REQUEST);
-// return;
-// } catch (FileAccessForbiddenException fafe) {
-// LOG.error(fafe);
-// sendError(ctx, FORBIDDEN);
-// return;
-// } catch (IOException ioe) {
-// LOG.error(ioe);
-// sendError(ctx, INTERNAL_SERVER_ERROR);
-// return;
-// }
// Write the content.
- Channel ch = e.getChannel();
if (file == null) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+ if (!HttpHeaders.isKeepAlive(request)) {
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ ctx.writeAndFlush(response);
}
- } else {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ } else {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ ChannelFuture writeFuture = null;
long totalSize = 0;
for (FileChunk chunk : file) {
totalSize += chunk.length();
}
- setContentLength(response, totalSize);
+ HttpHeaders.setContentLength(response, totalSize);
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ }
// Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
+ writeFuture = ctx.write(response);
for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk);
+ writeFuture = sendFile(ctx, chunk);
if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
}
+ if (ctx.pipeline().get(SslHandler.class) == null) {
+ writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ } else {
+ ctx.flush();
+ }
// Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
+ if (!HttpHeaders.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
writeFuture.addListener(ChannelFutureListener.CLOSE);
}
}
+
}
private ChannelFuture sendFile(ChannelHandlerContext ctx,
- Channel ch,
FileChunk file) throws IOException {
RandomAccessFile raf;
try {
@@ -158,38 +139,41 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
}
ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) != null) {
+ ChannelFuture lastContentFuture;
+ if (ctx.pipeline().get(SslHandler.class) != null) {
// Cannot use zero-copy with HTTPS.
- writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(),
- file.length(), 8192));
+ lastContentFuture = ctx.write(new HttpChunkedInput(new ChunkedFile(raf, file.startOffset(),
+ file.length(), 8192)));
} else {
// No encryption - use zero-copy.
final FileRegion region = new DefaultFileRegion(raf.getChannel(),
file.startOffset(), file.length());
- writeFuture = ch.write(region);
+ writeFuture = ctx.write(region);
+ lastContentFuture = ctx.write(LastHttpContent.EMPTY_LAST_CONTENT);
writeFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
- region.releaseExternalResources();
+ if (region.refCnt() > 0) {
+ region.release();
+ }
}
});
}
- return writeFuture;
+ return lastContentFuture;
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
+ Channel ch = ctx.channel();
if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
+ sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
- cause.printStackTrace();
- if (ch.isConnected()) {
- sendError(ctx, INTERNAL_SERVER_ERROR);
+ LOG.error(cause.getMessage(), cause);
+ if (ch.isActive()) {
+ sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
@@ -221,13 +205,12 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(ChannelBuffers.copiedBuffer(
- "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+ Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private List<String> splitMaps(List<String> qids) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index 4c8bd8b..0000000
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpContentCompressor;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
- private String userName;
- private String appId;
- public HttpDataServerPipelineFactory(String userName, String appId) {
- this.userName = userName;
- this.appId = appId;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- // Create a default pipeline implementation.
- ChannelPipeline pipeline = pipeline();
-
- // Uncomment the following line if you want HTTPS
- // SSLEngine engine =
- // SecureChatSslContextFactory.getServerContext().createSSLEngine();
- // engine.setUseClientMode(false);
- // pipeline.addLast("ssl", new SslHandler(engine));
-
- pipeline.addLast("decoder", new HttpRequestDecoder());
- //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
- pipeline.addLast("deflater", new HttpContentCompressor());
- pipeline.addLast("handler", new HttpDataServerHandler(userName, appId));
- return pipeline;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
index d633058..ce4018b 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
@@ -19,7 +19,22 @@
package org.apache.tajo.pullserver;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,23 +63,13 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.RpcChannelFactory;
import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
import java.io.File;
import java.io.FileNotFoundException;
@@ -78,16 +83,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
public class PullServerAuxService extends AuxiliaryService {
@@ -100,9 +95,9 @@ public class PullServerAuxService extends AuxiliaryService {
public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- private HttpPipelineFactory pipelineFact;
+ private ServerBootstrap selector;
+ private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ private HttpChannelInitializer initializer;
private int sslFileBufferSize;
private ApplicationId appId;
@@ -130,7 +125,7 @@ public class PullServerAuxService extends AuxiliaryService {
public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
@Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
- static class ShuffleMetrics implements ChannelFutureListener {
+ static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> {
@Metric({"OutputBytes","PullServer output in bytes"})
MutableCounterLong shuffleOutputBytes;
@Metric({"Failed","# of failed shuffle outputs"})
@@ -211,16 +206,10 @@ public class PullServerAuxService extends AuxiliaryService {
readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
DEFAULT_SHUFFLE_READAHEAD_BYTES);
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("PullServerAuxService Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("PullServerAuxService Netty Worker #%d")
- .build();
-
- selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
+ selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", 0)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.TCP_NODELAY, true);
localFS = new LocalFileSystem();
super.init(new Configuration(conf));
@@ -233,20 +222,23 @@ public class PullServerAuxService extends AuxiliaryService {
@Override
public synchronized void start() {
Configuration conf = getConfig();
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ ServerBootstrap bootstrap = selector.clone();
try {
- pipelineFact = new HttpPipelineFactory(conf);
+ initializer = new HttpChannelInitializer(conf);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
- bootstrap.setPipelineFactory(pipelineFact);
+ bootstrap.channel(NioServerSocketChannel.class)
+ .handler(initializer);
port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
ConfVars.PULLSERVER_PORT.defaultIntVal);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
- accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
+ .syncUninterruptibly();
+ accepted.add(future.channel());
+ port = ((InetSocketAddress)future.channel().localAddress()).getPort();
conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
- pipelineFact.PullServer.setPort(port);
+ initializer.PullServer.setPort(port);
LOG.info(getName() + " listening on port " + port);
super.start();
@@ -261,10 +253,19 @@ public class PullServerAuxService extends AuxiliaryService {
@Override
public synchronized void stop() {
try {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
- pipelineFact.destroy();
+ accepted.close();
+ if (selector != null) {
+ if (selector.group() != null) {
+ selector.group().shutdownGracefully();
+ }
+ if (selector.childGroup() != null) {
+ selector.childGroup().shutdownGracefully();
+ }
+ }
+
+ if (initializer != null) {
+ initializer.destroy();
+ }
localFS.close();
} catch (Throwable t) {
@@ -285,12 +286,12 @@ public class PullServerAuxService extends AuxiliaryService {
}
}
- class HttpPipelineFactory implements ChannelPipelineFactory {
+ class HttpChannelInitializer extends ChannelInitializer<Channel> {
final PullServer PullServer;
private SSLFactory sslFactory;
- public HttpPipelineFactory(Configuration conf) throws Exception {
+ public HttpChannelInitializer(Configuration conf) throws Exception {
PullServer = new PullServer(conf);
if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
@@ -306,24 +307,25 @@ public class PullServerAuxService extends AuxiliaryService {
}
@Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
if (sslFactory != null) {
pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
}
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+
pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", PullServer);
- return pipeline;
// TODO factor security manager into pipeline
// TODO factor out encode/decode to permit binary shuffle
// TODO factor out decode of index to permit alt. models
}
}
- class PullServer extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
private final Configuration conf;
private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
private int port;
@@ -349,33 +351,27 @@ public class PullServerAuxService extends AuxiliaryService {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
throws Exception {
-
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
+ if (request.getMethod() != HttpMethod.GET) {
+ sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
// Parsing the URL into key-values
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
+ final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
final List<String> types = params.get("type");
final List<String> taskIdList = params.get("ta");
final List<String> stageIds = params.get("sid");
final List<String> partitionIds = params.get("p");
- if (types == null || taskIdList == null || stageIds == null
- || partitionIds == null) {
- sendError(ctx, "Required type, taskIds, stage Id, and partition id",
- BAD_REQUEST);
+ if (types == null || taskIdList == null || stageIds == null || partitionIds == null) {
+ sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST);
return;
}
if (types.size() != 1 || stageIds.size() != 1) {
- sendError(ctx, "Required type, taskIds, stage Id, and partition id",
- BAD_REQUEST);
+ sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST);
return;
}
@@ -389,12 +385,11 @@ public class PullServerAuxService extends AuxiliaryService {
// the working dir of tajo worker for each query
String queryBaseDir = queryId + "/output" + "/";
- LOG.info("PullServer request param: repartitionType=" + repartitionType +
- ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList);
+ LOG.info("PullServer request param: repartitionType=" + repartitionType + ", sid=" + sid + ", partitionId="
+ + partitionId + ", taskIds=" + taskIdList);
String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
- if (taskLocalDir == null ||
- taskLocalDir.equals("")) {
+ if (taskLocalDir == null || taskLocalDir.equals("")) {
LOG.error("Tajo local directory should be specified.");
}
LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
@@ -402,9 +397,8 @@ public class PullServerAuxService extends AuxiliaryService {
// if a stage requires a range partitioning
if (repartitionType.equals("r")) {
String ta = taskIds.get(0);
- Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
- + ta + "/output/", conf));
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
+ + "/output/", conf));
String startKey = params.get("start").get(0);
String endKey = params.get("end").get(0);
@@ -415,19 +409,19 @@ public class PullServerAuxService extends AuxiliaryService {
chunk = getFileCunks(path, startKey, endKey, last);
} catch (Throwable t) {
LOG.error("ERROR Request: " + request.getUri(), t);
- sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+ sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
return;
}
if (chunk != null) {
chunks.add(chunk);
}
- // if a stage requires a hash repartition or a scattered hash repartition
+ // if a stage requires a hash repartition or a scattered hash
+ // repartition
} else if (repartitionType.equals("h") || repartitionType.equals("s")) {
for (String ta : taskIds) {
- Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
- ta + "/output/" + partitionId, conf));
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
+ + "/output/" + partitionId, conf));
File file = new File(path.toUri());
FileChunk chunk = new FileChunk(file, 0, file.length());
chunks.add(chunk);
@@ -438,45 +432,54 @@ public class PullServerAuxService extends AuxiliaryService {
}
// Write the content.
- Channel ch = e.getChannel();
if (chunks.size() == 0) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+
+ if (!HttpHeaders.isKeepAlive(request)) {
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ ctx.writeAndFlush(response);
}
- } else {
+ } else {
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ ChannelFuture writeFuture = null;
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
long totalSize = 0;
for (FileChunk chunk : file) {
totalSize += chunk.length();
}
- setContentLength(response, totalSize);
+ HttpHeaders.setContentLength(response, totalSize);
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ }
// Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
+ writeFuture = ctx.write(response);
for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk);
+ writeFuture = sendFile(ctx, chunk);
if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
}
+ if (ctx.pipeline().get(SslHandler.class) == null) {
+ writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ } else {
+ ctx.flush();
+ }
// Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
+ if (!HttpHeaders.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
writeFuture.addListener(ChannelFutureListener.CLOSE);
}
}
+
}
private ChannelFuture sendFile(ChannelHandlerContext ctx,
- Channel ch,
FileChunk file) throws IOException {
RandomAccessFile spill;
try {
@@ -485,26 +488,27 @@ public class PullServerAuxService extends AuxiliaryService {
LOG.info(file.getFile() + " not found");
return null;
}
- ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) == null) {
+
+ ChannelFuture lastContentFuture;
+ if (ctx.pipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
file.startOffset(), file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
- writeFuture = ch.write(partition);
- writeFuture.addListener(new FileCloseListener(partition, null, 0, null));
+ lastContentFuture = ctx.write(partition);
+ lastContentFuture.addListener(new FileCloseListener(partition, null, 0, null));
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
file.startOffset(), file.length(), sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
file.getFile().getAbsolutePath());
- writeFuture = ch.write(chunk);
+ lastContentFuture = ctx.write(new HttpChunkedInput(chunk));
}
metrics.shuffleConnections.incr();
metrics.shuffleOutputBytes.incr(file.length()); // optimistic
- return writeFuture;
+ return lastContentFuture;
}
-
+
private void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
sendError(ctx, "", status);
@@ -512,29 +516,26 @@ public class PullServerAuxService extends AuxiliaryService {
private void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+ Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
+ Channel ch = ctx.channel();
if (cause instanceof TooLongFrameException) {
- sendError(ctx, BAD_REQUEST);
+ sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
LOG.error("PullServer error: ", cause);
- if (ch.isConnected()) {
- LOG.error("PullServer error " + e);
- sendError(ctx, INTERNAL_SERVER_ERROR);
+ if (ch.isActive()) {
+ sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
}
}
}
[2/3] tajo git commit: TAJO-527: Upgrade to Netty 4
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 860bc8e..f0dcd26 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -19,6 +19,10 @@
package org.apache.tajo.pullserver;
import com.google.common.collect.Lists;
+
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.*;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -53,15 +57,18 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.handler.codec.http.*;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.*;
import java.net.InetSocketAddress;
@@ -72,16 +79,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
public class TajoPullServerService extends AbstractService {
private static final Log LOG = LogFactory.getLog(TajoPullServerService.class);
@@ -93,9 +92,9 @@ public class TajoPullServerService extends AbstractService {
public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- private HttpPipelineFactory pipelineFact;
+ private ServerBootstrap selector;
+ private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ private HttpChannelInitializer channelInitializer;
private int sslFileBufferSize;
private ApplicationId appId;
@@ -131,7 +130,7 @@ public class TajoPullServerService extends AbstractService {
}
@Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
- static class ShuffleMetrics implements ChannelFutureListener {
+ static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> {
@Metric({"OutputBytes","PullServer output in bytes"})
MutableCounterLong shuffleOutputBytes;
@Metric({"Failed","# of failed shuffle outputs"})
@@ -212,7 +211,10 @@ public class TajoPullServerService extends AbstractService {
int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
Runtime.getRuntime().availableProcessors() * 2);
- selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
+ selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.TCP_NODELAY, true);
localFS = new LocalFileSystem();
@@ -228,23 +230,26 @@ public class TajoPullServerService extends AbstractService {
// TODO change AbstractService to throw InterruptedException
@Override
public synchronized void serviceInit(Configuration conf) throws Exception {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
+ ServerBootstrap bootstrap = selector.clone();
try {
- pipelineFact = new HttpPipelineFactory(conf);
+ channelInitializer = new HttpChannelInitializer(conf);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
- bootstrap.setPipelineFactory(pipelineFact);
+ bootstrap.childHandler(channelInitializer)
+ .channel(NioServerSocketChannel.class);
port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
ConfVars.PULLSERVER_PORT.defaultIntVal);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
+ .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
+ .syncUninterruptibly();
- accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ accepted.add(future.channel());
+ port = ((InetSocketAddress)future.channel().localAddress()).getPort();
conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
- pipelineFact.PullServer.setPort(port);
+ channelInitializer.PullServer.setPort(port);
LOG.info(getName() + " listening on port " + port);
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
@@ -314,10 +319,19 @@ public class TajoPullServerService extends AbstractService {
@Override
public synchronized void stop() {
try {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
- pipelineFact.destroy();
+ accepted.close();
+ if (selector != null) {
+ if (selector.group() != null) {
+ selector.group().shutdownGracefully();
+ }
+ if (selector.childGroup() != null) {
+ selector.childGroup().shutdownGracefully();
+ }
+ }
+
+ if (channelInitializer != null) {
+ channelInitializer.destroy();
+ }
localFS.close();
} catch (Throwable t) {
@@ -337,12 +351,12 @@ public class TajoPullServerService extends AbstractService {
}
}
- class HttpPipelineFactory implements ChannelPipelineFactory {
+ class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
final PullServer PullServer;
private SSLFactory sslFactory;
- public HttpPipelineFactory(Configuration conf) throws Exception {
+ public HttpChannelInitializer(Configuration conf) throws Exception {
PullServer = new PullServer(conf);
if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
@@ -358,8 +372,8 @@ public class TajoPullServerService extends AbstractService {
}
@Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
+ protected void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
if (sslFactory != null) {
pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
}
@@ -367,10 +381,9 @@ public class TajoPullServerService extends AbstractService {
int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
pipeline.addLast("chunking", new ChunkedWriteHandler());
pipeline.addLast("shuffle", PullServer);
- return pipeline;
// TODO factor security manager into pipeline
// TODO factor out encode/decode to permit binary shuffle
// TODO factor out decode of index to permit alt. models
@@ -408,31 +421,31 @@ public class TajoPullServerService extends AbstractService {
this.numFiles = numFiles;
this.remainFiles = new AtomicInteger(numFiles);
}
- public void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
- synchronized(remainFiles) {
- long fileSendTime = System.currentTimeMillis() - fileStartTime;
- if (fileSendTime > 20 * 1000) {
- LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount());
- numSlowFile++;
- }
- if (fileSendTime > maxTime) {
- maxTime = fileSendTime;
- }
- if (fileSendTime < minTime) {
- minTime = fileSendTime;
- }
- int remain = remainFiles.decrementAndGet();
- if (remain <= 0) {
- processingStatusMap.remove(requestUri);
- LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " +
- "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " +
- "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
- }
+
+ public synchronized void decrementRemainFiles(FileRegion filePart, long fileStartTime) {
+ long fileSendTime = System.currentTimeMillis() - fileStartTime;
+ if (fileSendTime > 20 * 1000) {
+ LOG.info("PullServer send too long time: filePos=" + filePart.position() + ", fileLen=" + filePart.count());
+ numSlowFile++;
+ }
+ if (fileSendTime > maxTime) {
+ maxTime = fileSendTime;
+ }
+ if (fileSendTime < minTime) {
+ minTime = fileSendTime;
+ }
+ int remain = remainFiles.decrementAndGet();
+ if (remain <= 0) {
+ processingStatusMap.remove(requestUri);
+ LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, "
+ + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, "
+ + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile);
}
}
}
- class PullServer extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
private final Configuration conf;
// private final IndexCache indexCache;
@@ -466,69 +479,58 @@ public class TajoPullServerService extends AbstractService {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
- throws Exception {
-
- accepted.add(evt.getChannel());
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.add(ctx.channel());
LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size()));
- super.channelOpen(ctx, evt);
-
+ super.channelRegistered(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
+ throws Exception {
- HttpRequest request = (HttpRequest) e.getMessage();
- if (request.getMethod() != GET) {
- sendError(ctx, METHOD_NOT_ALLOWED);
+ if (request.getMethod() != HttpMethod.GET) {
+ sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString());
processingStatusMap.put(request.getUri().toString(), processingStatus);
// Parsing the URL into key-values
- final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
+ final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
final List<String> types = params.get("type");
final List<String> qids = params.get("qid");
final List<String> taskIdList = params.get("ta");
- final List<String> stageIds = params.get("sid");
+ final List<String> subQueryIds = params.get("sid");
final List<String> partIds = params.get("p");
final List<String> offsetList = params.get("offset");
final List<String> lengthList = params.get("length");
- if (types == null || stageIds == null || qids == null || partIds == null) {
- sendError(ctx, "Required queryId, type, stage Id, and part id",
- BAD_REQUEST);
+ if (types == null || subQueryIds == null || qids == null || partIds == null) {
+ sendError(ctx, "Required queryId, type, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
return;
}
- if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
- sendError(ctx, "Required qids, type, taskIds, stage Id, and part id",
- BAD_REQUEST);
+ if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) {
+ sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST);
return;
}
String partId = partIds.get(0);
String queryId = qids.get(0);
String shuffleType = types.get(0);
- String sid = stageIds.get(0);
+ String sid = subQueryIds.get(0);
long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
- if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) {
- sendError(ctx, "Required taskIds", BAD_REQUEST);
- }
-
List<String> taskIds = splitMaps(taskIdList);
String queryBaseDir = queryId.toString() + "/output";
if (LOG.isDebugEnabled()) {
- LOG.debug("PullServer request param: shuffleType=" + shuffleType +
- ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList);
+ LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
+ + ", taskIds=" + taskIdList);
// the working dir of tajo worker for each query
LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir);
@@ -539,13 +541,14 @@ public class TajoPullServerService extends AbstractService {
// if a stage requires a range shuffle
if (shuffleType.equals("r")) {
String ta = taskIds.get(0);
- if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){
- LOG.warn(e);
- sendError(ctx, NO_CONTENT);
+ String pathString = queryBaseDir + "/" + sid + "/" + ta + "/output/";
+ if (!lDirAlloc.ifExists(pathString, conf)) {
+ LOG.warn(pathString + "does not exist.");
+ sendError(ctx, HttpResponseStatus.NO_CONTENT);
return;
}
- Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf));
+ Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
+ + "/output/", conf));
String startKey = params.get("start").get(0);
String endKey = params.get("end").get(0);
boolean last = params.get("final") != null;
@@ -555,7 +558,7 @@ public class TajoPullServerService extends AbstractService {
chunk = getFileCunks(path, startKey, endKey, last);
} catch (Throwable t) {
LOG.error("ERROR Request: " + request.getUri(), t);
- sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST);
+ sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
return;
}
if (chunk != null) {
@@ -568,7 +571,7 @@ public class TajoPullServerService extends AbstractService {
String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId;
if (!lDirAlloc.ifExists(partPath, conf)) {
LOG.warn("Partition shuffle file not exists: " + partPath);
- sendError(ctx, NO_CONTENT);
+ sendError(ctx, HttpResponseStatus.NO_CONTENT);
return;
}
@@ -581,7 +584,7 @@ public class TajoPullServerService extends AbstractService {
if (startPos >= file.length()) {
String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]";
LOG.error(errorMessage);
- sendError(ctx, errorMessage, BAD_REQUEST);
+ sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST);
return;
}
LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length());
@@ -589,44 +592,53 @@ public class TajoPullServerService extends AbstractService {
chunks.add(chunk);
} else {
LOG.error("Unknown shuffle type: " + shuffleType);
- sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
+ sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST);
return;
}
processingStatus.setNumFiles(chunks.size());
processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime;
// Write the content.
- Channel ch = e.getChannel();
if (chunks.size() == 0) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
- ch.write(response);
- if (!isKeepAlive(request)) {
- ch.close();
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
+
+ if (!HttpHeaders.isKeepAlive(request)) {
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ } else {
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ ctx.writeAndFlush(response);
}
- } else {
+ } else {
FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ ChannelFuture writeFuture = null;
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
long totalSize = 0;
for (FileChunk chunk : file) {
totalSize += chunk.length();
}
- setContentLength(response, totalSize);
+ HttpHeaders.setContentLength(response, totalSize);
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ }
// Write the initial line and the header.
- ch.write(response);
-
- ChannelFuture writeFuture = null;
+ writeFuture = ctx.write(response);
for (FileChunk chunk : file) {
- writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString());
+ writeFuture = sendFile(ctx, chunk, request.getUri().toString());
if (writeFuture == null) {
- sendError(ctx, NOT_FOUND);
+ sendError(ctx, HttpResponseStatus.NOT_FOUND);
return;
}
}
+ if (ctx.pipeline().get(SslHandler.class) == null) {
+ writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ } else {
+ ctx.flush();
+ }
// Decide whether to close the connection or not.
- if (!isKeepAlive(request)) {
+ if (!HttpHeaders.isKeepAlive(request)) {
// Close the connection when the whole content is written out.
writeFuture.addListener(ChannelFutureListener.CLOSE);
}
@@ -634,19 +646,18 @@ public class TajoPullServerService extends AbstractService {
}
private ChannelFuture sendFile(ChannelHandlerContext ctx,
- Channel ch,
FileChunk file,
String requestUri) throws IOException {
long startTime = System.currentTimeMillis();
- RandomAccessFile spill = null;
+ RandomAccessFile spill = null;
ChannelFuture writeFuture;
try {
spill = new RandomAccessFile(file.getFile(), "r");
- if (ch.getPipeline().get(SslHandler.class) == null) {
+ if (ctx.pipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion filePart = new FadvisedFileRegion(spill,
file.startOffset(), file.length(), manageOsCache, readaheadLength,
readaheadPool, file.getFile().getAbsolutePath());
- writeFuture = ch.write(filePart);
+ writeFuture = ctx.write(filePart);
writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this));
} else {
// HTTPS cannot be done with zero copy.
@@ -654,7 +665,7 @@ public class TajoPullServerService extends AbstractService {
file.startOffset(), file.length(), sslFileBufferSize,
manageOsCache, readaheadLength, readaheadPool,
file.getFile().getAbsolutePath());
- writeFuture = ch.write(chunk);
+ writeFuture = ctx.write(new HttpChunkedInput(chunk));
}
} catch (FileNotFoundException e) {
LOG.info(file.getFile() + " not found");
@@ -678,22 +689,20 @@ public class TajoPullServerService extends AbstractService {
private void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+ FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
+ Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+ response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- LOG.error(e.getCause().getMessage(), e.getCause());
- //if channel.close() is not called, never closed files in this request
- if (ctx.getChannel().isConnected()){
- ctx.getChannel().close();
+ LOG.error(cause.getMessage(), cause);
+ if (ctx.channel().isOpen()) {
+ ctx.channel().close();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
index 5591bba..fb91094 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java
@@ -20,6 +20,7 @@ package org.apache.tajo.pullserver.retriever;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.ExecutionBlockId;
@@ -27,9 +28,10 @@ import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
import org.apache.tajo.pullserver.FileAccessForbiddenException;
import org.apache.tajo.util.TajoIdUtils;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.QueryStringDecoder;
import java.io.File;
import java.io.FileNotFoundException;
@@ -67,7 +69,7 @@ public class AdvancedDataRetriever implements DataRetriever {
throws IOException {
final Map<String, List<String>> params =
- new QueryStringDecoder(request.getUri()).getParameters();
+ new QueryStringDecoder(request.getUri()).parameters();
if (!params.containsKey("qid")) {
throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
index 8f55f7b..0a1ad41 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java
@@ -18,8 +18,8 @@
package org.apache.tajo.pullserver.retriever;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
index dc63929..e26bcd6 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java
@@ -18,11 +18,12 @@
package org.apache.tajo.pullserver.retriever;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
import org.apache.tajo.pullserver.FileAccessForbiddenException;
import org.apache.tajo.pullserver.HttpDataServerHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.HttpRequest;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index d0037ca..2dc3765 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -138,7 +138,15 @@
<dependencies>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 4b1842e..5845229 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -20,12 +20,15 @@ package org.apache.tajo.rpc;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.*;
+
+import io.netty.channel.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.GenericFutureListener;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
@@ -38,8 +41,7 @@ import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
public class AsyncRpcClient extends NettyClientBase {
private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
- private final ChannelUpstreamHandler handler;
- private final ChannelPipelineFactory pipeFactory;
+ private final ChannelInitializer<Channel> initializer;
private final ProxyRpcChannel rpcChannel;
private final AtomicInteger sequence = new AtomicInteger(0);
@@ -56,7 +58,7 @@ public class AsyncRpcClient extends NettyClientBase {
* new an instance through this constructor.
*/
AsyncRpcClient(final Class<?> protocol,
- final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+ final InetSocketAddress addr, int retries)
throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
this.protocol = protocol;
@@ -65,10 +67,9 @@ public class AsyncRpcClient extends NettyClientBase {
Class<?> serviceClass = Class.forName(serviceClassName);
stubMethod = serviceClass.getMethod("newStub", RpcChannel.class);
- this.handler = new ClientChannelUpstreamHandler();
- pipeFactory = new ProtoPipelineFactory(handler,
+ initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(),
RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory, factory, retries);
+ super.init(addr, initializer, retries);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, true);
}
@@ -83,7 +84,7 @@ public class AsyncRpcClient extends NettyClientBase {
try {
return (T) stubMethod.invoke(null, rpcChannel);
} catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
+ throw new RemoteException(e.getMessage(), e);
}
}
@@ -91,12 +92,32 @@ public class AsyncRpcClient extends NettyClientBase {
return this.rpcChannel;
}
+ protected void sendExceptions(String message) {
+ for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
+ ResponseCallback callback = callbackEntry.getValue();
+ Integer id = callbackEntry.getKey();
+
+ RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
+ .setErrorMessage(message)
+ .setId(id);
+
+ callback.run(responseBuilder.build());
+ }
+ }
+
+ @Override
+ public void close() {
+ sendExceptions("AsyncRpcClient terminates all the connections");
+
+ super.close();
+ }
+
private class ProxyRpcChannel implements RpcChannel {
- private final ClientChannelUpstreamHandler handler;
+ private final ClientChannelInboundHandler handler;
public ProxyRpcChannel() {
- this.handler = getChannel().getPipeline()
- .get(ClientChannelUpstreamHandler.class);
+ this.handler = getChannel().pipeline()
+ .get(ClientChannelInboundHandler.class);
if (handler == null) {
throw new IllegalArgumentException("Channel does not have " +
@@ -117,7 +138,17 @@ public class AsyncRpcClient extends NettyClientBase {
handler.registerCallback(nextSeqId,
new ResponseCallback(controller, responseType, done));
- getChannel().write(rpcRequest);
+ ChannelPromise channelPromise = getChannel().newPromise();
+ channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ handler.exceptionCaught(null, new ServiceException(future.cause()));
+ }
+ }
+ });
+ getChannel().writeAndFlush(rpcRequest, channelPromise);
}
private Message buildRequest(int seqId,
@@ -180,10 +211,11 @@ public class AsyncRpcClient extends NettyClientBase {
private String getErrorMessage(String message) {
return "Exception [" + protocol.getCanonicalName() +
"(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
- getChannel().getRemoteAddress()) + ")]: " + message;
+ getChannel().remoteAddress()) + ")]: " + message;
}
- private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
synchronized void registerCallback(int seqId, ResponseCallback callback) {
@@ -196,37 +228,39 @@ public class AsyncRpcClient extends NettyClientBase {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- RpcResponse response = (RpcResponse) e.getMessage();
- ResponseCallback callback = requests.remove(response.getId());
+ if (msg instanceof RpcResponse) {
+ try {
+ RpcResponse response = (RpcResponse) msg;
+ ResponseCallback callback = requests.remove(response.getId());
- if (callback == null) {
- LOG.warn("Dangling rpc call");
- } else {
- callback.run(response);
+ if (callback == null) {
+ LOG.warn("Dangling rpc call");
+ } else {
+ callback.run(response);
+ }
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- LOG.error(getRemoteAddress() + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
-
- for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
- ResponseCallback callback = callbackEntry.getValue();
- Integer id = callbackEntry.getKey();
-
- RpcResponse.Builder responseBuilder = RpcResponse.newBuilder()
- .setErrorMessage(e.toString())
- .setId(id);
+ LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause);
- callback.run(responseBuilder.build());
- }
+ sendExceptions(cause.getMessage());
+
if(LOG.isDebugEnabled()) {
- LOG.error("" + e.getCause(), e.getCause());
+ LOG.error(cause.getMessage(), cause);
} else {
- LOG.error("RPC Exception:" + e.getCause());
+ LOG.error("RPC Exception:" + cause.getMessage());
+ }
+
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index f9c5d3b..3b5a747 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -18,16 +18,16 @@
package org.apache.tajo.rpc;
+import com.google.protobuf.*;
import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
+
+import io.netty.channel.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
+
+import io.netty.util.ReferenceCountUtil;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
@@ -36,7 +36,7 @@ public class AsyncRpcServer extends NettyServerBase {
private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class);
private final Service service;
- private final ChannelPipelineFactory pipeline;
+ private final ChannelInitializer<Channel> initializer;
public AsyncRpcServer(final Class<?> protocol,
final Object instance,
@@ -52,87 +52,97 @@ public class AsyncRpcServer extends NettyServerBase {
Method method = serviceClass.getMethod("newReflectiveService", interfaceClass);
this.service = (Service) method.invoke(null, instance);
- ServerHandler handler = new ServerHandler();
- this.pipeline = new ProtoPipelineFactory(handler,
- RpcRequest.getDefaultInstance());
- super.init(this.pipeline, workerNum);
+ this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
+ super.init(this.initializer, workerNum);
}
- private class ServerHandler extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ private class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
- throws Exception {
-
- accepted.add(evt.getChannel());
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.add(ctx.channel());
if(LOG.isDebugEnabled()){
LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
}
- super.channelOpen(ctx, evt);
+ super.channelRegistered(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.remove(ctx.channel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+ }
+ super.channelUnregistered(ctx);
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
+ if (msg instanceof RpcRequest) {
+ try {
+ final RpcRequest request = (RpcRequest) msg;
- final RpcRequest request = (RpcRequest) e.getMessage();
+ String methodName = request.getMethodName();
+ MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
- String methodName = request.getMethodName();
- MethodDescriptor methodDescriptor = service.getDescriptorForType().
- findMethodByName(methodName);
+ if (methodDescriptor == null) {
+ throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+ }
- if (methodDescriptor == null) {
- throw new RemoteCallException(request.getId(),
- new NoSuchMethodException(methodName));
- }
+ Message paramProto = null;
+ if (request.hasRequestMessage()) {
+ try {
+ paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+ .mergeFrom(request.getRequestMessage()).build();
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+ }
- Message paramProto = null;
- if (request.hasRequestMessage()) {
- try {
- paramProto = service.getRequestPrototype(methodDescriptor)
- .newBuilderForType().mergeFrom(request.getRequestMessage()).
- build();
- } catch (Throwable t) {
- throw new RemoteCallException(request.getId(), methodDescriptor, t);
- }
- }
+ final RpcController controller = new NettyRpcController();
- final Channel channel = e.getChannel();
- final RpcController controller = new NettyRpcController();
+ RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() {
- RpcCallback<Message> callback =
- !request.hasId() ? null : new RpcCallback<Message>() {
+ public void run(Message returnValue) {
- public void run(Message returnValue) {
+ RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
- RpcResponse.Builder builder = RpcResponse.newBuilder()
- .setId(request.getId());
+ if (returnValue != null) {
+ builder.setResponseMessage(returnValue.toByteString());
+ }
- if (returnValue != null) {
- builder.setResponseMessage(returnValue.toByteString());
- }
+ if (controller.failed()) {
+ builder.setErrorMessage(controller.errorText());
+ }
- if (controller.failed()) {
- builder.setErrorMessage(controller.errorText());
- }
+ ctx.writeAndFlush(builder.build());
+ }
+ };
- channel.write(builder.build());
- }
- };
+ service.callMethod(methodDescriptor, controller, paramProto, callback);
- service.callMethod(methodDescriptor, controller, paramProto, callback);
+ } finally {
+ ReferenceCountUtil.release(msg);
+ }
+ }
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception{
-
- if (e.getCause() instanceof RemoteCallException) {
- RemoteCallException callException = (RemoteCallException) e.getCause();
- e.getChannel().write(callException.getResponse());
+ if (cause instanceof RemoteCallException) {
+ RemoteCallException callException = (RemoteCallException) cause;
+ ctx.writeAndFlush(callException.getResponse());
} else {
- LOG.error(e.getCause());
+ LOG.error(cause.getMessage());
+ }
+
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
}
}
+
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 869919c..4ec5718 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,22 +18,23 @@
package org.apache.tajo.rpc;
-import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.*;
import com.google.protobuf.Descriptors.MethodDescriptor;
-import com.google.protobuf.Message;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+
+import io.netty.channel.*;
+import io.netty.util.concurrent.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import io.netty.util.ReferenceCountUtil;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.*;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
@@ -41,8 +42,7 @@ import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
public class BlockingRpcClient extends NettyClientBase {
private static final Log LOG = LogFactory.getLog(RpcProtos.class);
- private final ChannelUpstreamHandler handler;
- private final ChannelPipelineFactory pipeFactory;
+ private final ChannelInitializer<Channel> initializer;
private final ProxyRpcChannel rpcChannel;
private final AtomicInteger sequence = new AtomicInteger(0);
@@ -59,7 +59,7 @@ public class BlockingRpcClient extends NettyClientBase {
* new an instance through this constructor.
*/
BlockingRpcClient(final Class<?> protocol,
- final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries)
+ final InetSocketAddress addr, int retries)
throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException {
this.protocol = protocol;
@@ -69,10 +69,8 @@ public class BlockingRpcClient extends NettyClientBase {
stubMethod = serviceClass.getMethod("newBlockingStub",
BlockingRpcChannel.class);
- this.handler = new ClientChannelUpstreamHandler();
- pipeFactory = new ProtoPipelineFactory(handler,
- RpcResponse.getDefaultInstance());
- super.init(addr, pipeFactory, factory, retries);
+ initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance());
+ super.init(addr, initializer, retries);
rpcChannel = new ProxyRpcChannel();
this.key = new RpcConnectionKey(addr, protocol, false);
@@ -96,14 +94,24 @@ public class BlockingRpcClient extends NettyClientBase {
return this.rpcChannel;
}
+ @Override
+ public void close() {
+ for(ProtoCallFuture callback: requests.values()) {
+ callback.setFailed("BlockingRpcClient terminates all the connections",
+ new ServiceException("BlockingRpcClient terminates all the connections"));
+ }
+
+ super.close();
+ }
+
private class ProxyRpcChannel implements BlockingRpcChannel {
- private final ClientChannelUpstreamHandler handler;
+ private final ClientChannelInboundHandler handler;
public ProxyRpcChannel() {
- this.handler = getChannel().getPipeline().
- get(ClientChannelUpstreamHandler.class);
+ this.handler = getChannel().pipeline().
+ get(ClientChannelInboundHandler.class);
if (handler == null) {
throw new IllegalArgumentException("Channel does not have " +
@@ -125,10 +133,20 @@ public class BlockingRpcClient extends NettyClientBase {
ProtoCallFuture callFuture =
new ProtoCallFuture(controller, responsePrototype);
requests.put(nextSeqId, callFuture);
- getChannel().write(rpcRequest);
+
+ ChannelPromise channelPromise = getChannel().newPromise();
+ channelPromise.addListener(new GenericFutureListener<ChannelFuture>() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ handler.exceptionCaught(null, new ServiceException(future.cause()));
+ }
+ }
+ });
+ getChannel().writeAndFlush(rpcRequest, channelPromise);
try {
- return callFuture.get();
+ return callFuture.get(60, TimeUnit.SECONDS);
} catch (Throwable t) {
if (t instanceof ExecutionException) {
Throwable cause = t.getCause();
@@ -159,7 +177,7 @@ public class BlockingRpcClient extends NettyClientBase {
if(protocol != null && getChannel() != null) {
return protocol.getName() +
"(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
- getChannel().getRemoteAddress()) + "): " + message;
+ getChannel().remoteAddress()) + "): " + message;
} else {
return "Exception " + message;
}
@@ -168,55 +186,64 @@ public class BlockingRpcClient extends NettyClientBase {
private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) {
if(protocol != null && getChannel() != null) {
return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(),
- RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress()));
+ RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress()));
} else {
return new TajoServiceException(response.getErrorMessage());
}
}
- private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
- RpcResponse rpcResponse = (RpcResponse) e.getMessage();
- ProtoCallFuture callback = requests.remove(rpcResponse.getId());
+ if (msg instanceof RpcResponse) {
+ try {
+ RpcResponse rpcResponse = (RpcResponse) msg;
+ ProtoCallFuture callback = requests.remove(rpcResponse.getId());
- if (callback == null) {
- LOG.warn("Dangling rpc call");
- } else {
- if (rpcResponse.hasErrorMessage()) {
- callback.setFailed(rpcResponse.getErrorMessage(),
- makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
- throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
- } else {
- Message responseMessage;
-
- if (!rpcResponse.hasResponseMessage()) {
- responseMessage = null;
+ if (callback == null) {
+ LOG.warn("Dangling rpc call");
} else {
- responseMessage =
- callback.returnType.newBuilderForType().
- mergeFrom(rpcResponse.getResponseMessage()).build();
+ if (rpcResponse.hasErrorMessage()) {
+ callback.setFailed(rpcResponse.getErrorMessage(),
+ makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace())));
+ throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage()));
+ } else {
+ Message responseMessage;
+
+ if (!rpcResponse.hasResponseMessage()) {
+ responseMessage = null;
+ } else {
+ responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage())
+ .build();
+ }
+
+ callback.setResponse(responseMessage);
+ }
}
-
- callback.setResponse(responseMessage);
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- e.getChannel().close();
for(ProtoCallFuture callback: requests.values()) {
- callback.setFailed(e.getCause().getMessage(), e.getCause());
+ callback.setFailed(cause.getMessage(), cause);
}
+
if(LOG.isDebugEnabled()) {
- LOG.error("" + e.getCause().getMessage(), e.getCause());
+ LOG.error("" + cause.getMessage(), cause);
} else {
- LOG.error("RPC Exception:" + e.getCause().getMessage());
+ LOG.error("RPC Exception:" + cause.getMessage());
+ }
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
}
}
}
@@ -253,6 +280,9 @@ public class BlockingRpcClient extends NettyClientBase {
public Message get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if(sem.tryAcquire(timeout, unit)) {
+ if (ee != null) {
+ throw ee;
+ }
return response;
} else {
throw new TimeoutException();
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 9e0d57c..0ce359f 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -22,19 +22,22 @@ import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
+
+import io.netty.channel.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.*;
import org.apache.tajo.rpc.RpcProtos.RpcRequest;
import org.apache.tajo.rpc.RpcProtos.RpcResponse;
+import io.netty.util.ReferenceCountUtil;
+
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
public class BlockingRpcServer extends NettyServerBase {
private static Log LOG = LogFactory.getLog(BlockingRpcServer.class);
private final BlockingService service;
- private final ChannelPipelineFactory pipeline;
+ private final ChannelInitializer<Channel> initializer;
public BlockingRpcServer(final Class<?> protocol,
final Object instance,
@@ -53,78 +56,92 @@ public class BlockingRpcServer extends NettyServerBase {
"newReflectiveBlockingService", interfaceClass);
this.service = (BlockingService) method.invoke(null, instance);
- this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
- RpcRequest.getDefaultInstance());
+ this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
- super.init(this.pipeline, workerNum);
+ super.init(this.initializer, workerNum);
}
- private class ServerHandler extends SimpleChannelUpstreamHandler {
+ @ChannelHandler.Sharable
+ private class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
- throws Exception {
-
- accepted.add(evt.getChannel());
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.add(ctx.channel());
if(LOG.isDebugEnabled()){
LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
}
- super.channelOpen(ctx, evt);
+ super.channelRegistered(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- final RpcRequest request = (RpcRequest) e.getMessage();
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ accepted.remove(ctx.channel());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size());
+ }
+ super.channelUnregistered(ctx);
+ }
- String methodName = request.getMethodName();
- MethodDescriptor methodDescriptor =
- service.getDescriptorForType().findMethodByName(methodName);
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
- if (methodDescriptor == null) {
- throw new RemoteCallException(request.getId(),
- new NoSuchMethodException(methodName));
- }
- Message paramProto = null;
- if (request.hasRequestMessage()) {
+ if (msg instanceof RpcRequest) {
try {
- paramProto = service.getRequestPrototype(methodDescriptor)
- .newBuilderForType().mergeFrom(request.getRequestMessage()).
- build();
-
- } catch (Throwable t) {
- throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ final RpcRequest request = (RpcRequest) msg;
+
+ String methodName = request.getMethodName();
+ MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName);
+
+ if (methodDescriptor == null) {
+ throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName));
+ }
+ Message paramProto = null;
+ if (request.hasRequestMessage()) {
+ try {
+ paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType()
+ .mergeFrom(request.getRequestMessage()).build();
+
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+ }
+ Message returnValue;
+ RpcController controller = new NettyRpcController();
+
+ try {
+ returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto);
+ } catch (Throwable t) {
+ throw new RemoteCallException(request.getId(), methodDescriptor, t);
+ }
+
+ RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId());
+
+ if (returnValue != null) {
+ builder.setResponseMessage(returnValue.toByteString());
+ }
+
+ if (controller.failed()) {
+ builder.setErrorMessage(controller.errorText());
+ }
+ ctx.writeAndFlush(builder.build());
+ } finally {
+ ReferenceCountUtil.release(msg);
}
}
- Message returnValue;
- RpcController controller = new NettyRpcController();
-
- try {
- returnValue = service.callBlockingMethod(methodDescriptor,
- controller, paramProto);
- } catch (Throwable t) {
- throw new RemoteCallException(request.getId(), methodDescriptor, t);
- }
-
- RpcResponse.Builder builder =
- RpcResponse.newBuilder().setId(request.getId());
-
- if (returnValue != null) {
- builder.setResponseMessage(returnValue.toByteString());
- }
-
- if (controller.failed()) {
- builder.setErrorMessage(controller.errorText());
- }
- e.getChannel().write(builder.build());
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- if (e.getCause() instanceof RemoteCallException) {
- RemoteCallException callException = (RemoteCallException) e.getCause();
- e.getChannel().write(callException.getResponse());
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof RemoteCallException) {
+ RemoteCallException callException = (RemoteCallException) cause;
+ ctx.writeAndFlush(callException.getResponse());
+ }
+
+ if (ctx != null && ctx.channel().isActive()) {
+ ctx.channel().close();
}
}
+
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
index fd612a5..c4c3256 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java
@@ -50,14 +50,14 @@ public class CallFuture<T> implements RpcCallback<T>, Future<T> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
- // TODO - to be implemented
- throw new UnsupportedOperationException();
+ controller.startCancel();
+ sem.release();
+ return controller.isCanceled();
}
@Override
public boolean isCancelled() {
- // TODO - to be implemented
- throw new UnsupportedOperationException();
+ return controller.isCanceled();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
index 1bf0ed8..4ba19a5 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java
@@ -24,9 +24,13 @@ import com.google.protobuf.RpcController;
public class DefaultRpcController implements RpcController {
private String errorText;
private boolean error;
+ private boolean canceled;
@Override
public void reset() {
+ errorText = "";
+ error = false;
+ canceled = false;
}
@Override
@@ -41,6 +45,7 @@ public class DefaultRpcController implements RpcController {
@Override
public void startCancel() {
+ this.canceled = true;
}
@Override
@@ -51,7 +56,7 @@ public class DefaultRpcController implements RpcController {
@Override
public boolean isCanceled() {
- return false;
+ return canceled;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index bc0c567..7b52178 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -18,12 +18,16 @@
package org.apache.tajo.rpc;
+import io.netty.channel.*;
+
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.Closeable;
import java.net.InetSocketAddress;
@@ -37,7 +41,7 @@ public abstract class NettyClientBase implements Closeable {
private static final long PAUSE = 1000; // 1 sec
private int numRetries;
- protected ClientBootstrap bootstrap;
+ protected Bootstrap bootstrap;
private ChannelFuture channelFuture;
public NettyClientBase() {
@@ -46,55 +50,39 @@ public abstract class NettyClientBase implements Closeable {
public abstract <T> T getStub();
public abstract RpcConnectionPool.RpcConnectionKey getKey();
- public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory,
+ public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer,
int numRetries) throws ConnectTimeoutException {
this.numRetries = numRetries;
- init(addr, pipeFactory, factory);
+ init(addr, initializer);
}
- public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
+ public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer)
throws ConnectTimeoutException {
- this.bootstrap = new ClientBootstrap(factory);
- this.bootstrap.setPipelineFactory(pipeFactory);
- // TODO - should be configurable
- this.bootstrap.setOption("connectTimeoutMillis", 10000);
- this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
- this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
- this.bootstrap.setOption("tcpNoDelay", true);
- this.bootstrap.setOption("keepAlive", true);
+ this.bootstrap = new Bootstrap();
+ this.bootstrap
+ .channel(NioSocketChannel.class)
+ .handler(initializer)
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+ .option(ChannelOption.SO_RCVBUF, 1048576 * 10)
+ .option(ChannelOption.TCP_NODELAY, true);
connect(addr);
}
+
+ private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) {
+
+ this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
+ .connect(address)
+ .addListener(listener);
+ }
private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException {
- this.channelFuture = bootstrap.connect(addr);
-
final CountDownLatch latch = new CountDownLatch(1);
- this.channelFuture.addListener(new ChannelFutureListener() {
- private final AtomicInteger retryCount = new AtomicInteger();
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- if (numRetries > retryCount.getAndIncrement()) {
- Thread.sleep(PAUSE);
- channelFuture = bootstrap.connect(addr);
- channelFuture.addListener(this);
-
- LOG.debug("Connecting to " + addr + " has been failed. Retrying to connect.");
- }
- else {
- latch.countDown();
-
- LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
- }
- }
- else {
- latch.countDown();
- }
- }
- });
+ GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch);
+ connectUsingNetty(addr, listener);
try {
latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS);
@@ -103,7 +91,7 @@ public abstract class NettyClientBase implements Closeable {
if (!channelFuture.isSuccess()) {
throw new ConnectTimeoutException("Connect error to " + addr +
- " caused by " + ExceptionUtils.getMessage(channelFuture.getCause()));
+ " caused by " + ExceptionUtils.getMessage(channelFuture.cause()));
}
}
@@ -115,34 +103,67 @@ public abstract class NettyClientBase implements Closeable {
handleConnectionInternally(addr);
}
- public boolean isConnected() {
- return getChannel().isConnected();
+ class RetryConnectionListener implements GenericFutureListener<ChannelFuture> {
+ private final AtomicInteger retryCount = new AtomicInteger();
+ private final InetSocketAddress address;
+ private final CountDownLatch latch;
+
+ RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) {
+ this.address = address;
+ this.latch = latch;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture channelFuture) throws Exception {
+ if (!channelFuture.isSuccess()) {
+ channelFuture.channel().close();
+
+ if (numRetries > retryCount.getAndIncrement()) {
+ final GenericFutureListener<ChannelFuture> currentListener = this;
+
+ RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() {
+ @Override
+ public void run() {
+ connectUsingNetty(address, currentListener);
+ }
+ }, PAUSE, TimeUnit.MILLISECONDS);
+
+ LOG.debug("Connecting to " + address + " has been failed. Retrying to connect.");
+ }
+ else {
+ latch.countDown();
+
+ LOG.error("Max retry count has been exceeded. attempts=" + numRetries);
+ }
+ }
+ else {
+ latch.countDown();
+ }
+ }
+ }
+
+ public boolean isActive() {
+ return getChannel().isActive();
}
public InetSocketAddress getRemoteAddress() {
- if (channelFuture == null || channelFuture.getChannel() == null) {
+ if (channelFuture == null || channelFuture.channel() == null) {
return null;
}
- return (InetSocketAddress) channelFuture.getChannel().getRemoteAddress();
+ return (InetSocketAddress) channelFuture.channel().remoteAddress();
}
public Channel getChannel() {
- return channelFuture.getChannel();
+ return channelFuture.channel();
}
@Override
public void close() {
- if(this.channelFuture != null && getChannel().isOpen()) {
- try {
- getChannel().close().awaitUninterruptibly();
- } catch (Throwable ce) {
- LOG.warn(ce);
- }
+ if (channelFuture != null && getChannel().isActive()) {
+ getChannel().close();
}
- if(this.bootstrap != null) {
- // This line will shutdown the factory
- // this.bootstrap.releaseExternalResources();
+ if (this.bootstrap != null) {
InetSocketAddress address = getRemoteAddress();
if (address != null) {
LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort());
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index ef090ff..1b45ac9 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -20,19 +20,23 @@ package org.apache.tajo.rpc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Random;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class NettyServerBase {
@@ -43,10 +47,10 @@ public class NettyServerBase {
protected String serviceName;
protected InetSocketAddress serverAddr;
protected InetSocketAddress bindAddress;
- protected ChannelPipelineFactory pipelineFactory;
+ protected ChannelInitializer<Channel> initializer;
protected ServerBootstrap bootstrap;
- protected Channel channel;
- protected ChannelGroup accepted = new DefaultChannelGroup();
+ protected ChannelFuture channelFuture;
+ protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private InetSocketAddress initIsa;
@@ -63,19 +67,19 @@ public class NettyServerBase {
this.serviceName = name;
}
- public void init(ChannelPipelineFactory pipeline, int workerNum) {
- ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
-
- pipelineFactory = pipeline;
- bootstrap = new ServerBootstrap(factory);
- bootstrap.setPipelineFactory(pipelineFactory);
- // TODO - should be configurable
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.setOption("child.connectTimeoutMillis", 10000);
- bootstrap.setOption("child.connectResponseTimeoutMillis", 10000);
- bootstrap.setOption("child.receiveBufferSize", 1048576 * 10);
+ public void init(ChannelInitializer<Channel> initializer, int workerNum) {
+ bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
+
+ this.initializer = initializer;
+ bootstrap
+ .channel(NioServerSocketChannel.class)
+ .childHandler(initializer)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
+ .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10);
}
public InetSocketAddress getListenAddress() {
@@ -98,28 +102,41 @@ public class NettyServerBase {
serverAddr = initIsa;
}
- this.channel = bootstrap.bind(serverAddr);
- this.bindAddress = (InetSocketAddress) channel.getLocalAddress();
+ this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly();
+ this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress();
LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress);
}
public Channel getChannel() {
- return this.channel;
+ return this.channelFuture.channel();
}
public void shutdown() {
- if(channel != null) {
- channel.close().awaitUninterruptibly();
- }
+ shutdown(false);
+ }
+ public void shutdown(boolean waitUntilThreadsStop) {
try {
- accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+ accepted.close();
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
}
+
if(bootstrap != null) {
- bootstrap.releaseExternalResources();
+ if (bootstrap.childGroup() != null) {
+ bootstrap.childGroup().shutdownGracefully();
+ if (waitUntilThreadsStop) {
+ bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+ }
+ }
+
+ if (bootstrap.group() != null) {
+ bootstrap.group().shutdownGracefully();
+ if (waitUntilThreadsStop) {
+ bootstrap.childGroup().terminationFuture().awaitUninterruptibly();
+ }
+ }
}
if (bindAddress != null) {
@@ -138,13 +155,14 @@ public class NettyServerBase {
// each system has a different starting port number within the given range.
private static final AtomicInteger nextPortNum =
new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange));
+ private static final Object lockObject = new Object();
private synchronized static int getUnusedPort() throws IOException {
while (true) {
int port = nextPortNum.getAndIncrement();
if (port >= endPortRange) {
- synchronized (nextPortNum) {
+ synchronized (lockObject) {
nextPortNum.set(startPortRange);
port = nextPortNum.getAndIncrement();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
index 70135a6..9b7f8ac 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java
@@ -20,7 +20,7 @@ package org.apache.tajo.rpc;
import com.google.protobuf.RpcCallback;
-public class NullCallback implements RpcCallback {
+public class NullCallback implements RpcCallback<Object> {
private final static NullCallback instance;
static {
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
new file mode 100644
index 0000000..6a340dc
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+
+import com.google.protobuf.MessageLite;
+
+class ProtoChannelInitializer extends ChannelInitializer<Channel> {
+ private final MessageLite defaultInstance;
+ private final ChannelHandler handler;
+
+ public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) {
+ this.handler = handler;
+ this.defaultInstance = defaultInstance;
+ }
+
+ @Override
+ protected void initChannel(Channel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
+ pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
+ pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
+ pipeline.addLast("protobufEncoder", new ProtobufEncoder());
+ pipeline.addLast("handler", handler);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java
deleted file mode 100644
index 7aa03e7..0000000
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.MessageLite;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
-
-public class ProtoPipelineFactory implements ChannelPipelineFactory {
- private final ChannelUpstreamHandler handler;
- private final MessageLite defaultInstance;
-
- public ProtoPipelineFactory(ChannelUpstreamHandler handlerFactory,
- MessageLite defaultInstance) {
- this.handler = handlerFactory;
- this.defaultInstance = defaultInstance;
- }
-
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline p = Channels.pipeline();
- p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
- p.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance));
- p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
- p.addLast("protobufEncoder", new ProtobufEncoder());
- p.addLast("handler", handler);
- return p;
- }
-}