You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/21 06:58:01 UTC
git commit: TAJO-537: After TAJO-522,
still OutOfMemoryError: unable to create new native thread. (Min Zhou
via hyunsik)
Updated Branches:
refs/heads/master b75ea74d5 -> e57cf426d
TAJO-537: After TAJO-522, still OutOfMemoryError: unable to create new native thread. (Min Zhou via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e57cf426
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e57cf426
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e57cf426
Branch: refs/heads/master
Commit: e57cf426d8d1d1b924a32a10f81a8912f2b0c45f
Parents: b75ea74
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Jan 21 14:49:56 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Jan 21 14:49:56 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 ++
.../master/querymaster/QueryInProgress.java | 13 ++++----
.../org/apache/tajo/rpc/AsyncRpcClient.java | 8 +++--
.../org/apache/tajo/rpc/BlockingRpcClient.java | 6 +++-
.../org/apache/tajo/rpc/NettyClientBase.java | 34 ++++++++++----------
5 files changed, 38 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b5b3d81..c19392d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -219,6 +219,9 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-537: After TAJO-522, still OutOfMemoryError: unable to create new
+ native thread. (Min Zhou via hyunsik)
+
TAJO-522: OutOfMemoryError: unable to create new native thread.
(hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 92ba2e2..2ab6c6a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -36,8 +37,9 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import java.net.InetSocketAddress;
@@ -62,7 +64,7 @@ public class QueryInProgress extends CompositeService {
private final TajoMaster.MasterContext masterContext;
- private AsyncRpcClient queryMasterRpc;
+ private NettyClientBase queryMasterRpc;
private QueryMasterProtocolService queryMasterRpcClient;
@@ -127,8 +129,7 @@ public class QueryInProgress extends CompositeService {
super.stop();
if(queryMasterRpc != null) {
- //TODO release to connection pool
- queryMasterRpc.close();
+ RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
}
}
@@ -185,8 +186,8 @@ public class QueryInProgress extends CompositeService {
InetSocketAddress addr = NetUtils.createSocketAddr(
queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
- //TODO Get Connection from pool
- queryMasterRpc = new AsyncRpcClient(QueryMasterProtocol.class, addr);
+ queryMasterRpc =
+ RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);
queryMasterRpcClient = queryMasterRpc.getStub();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index e8031c8..01e1110 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -51,7 +51,11 @@ public class AsyncRpcClient extends NettyClientBase {
private RpcConnectionKey key;
- public AsyncRpcClient(final Class<?> protocol,
+ /**
+ * Intentionally make this method package-private, avoiding user directly
+ * new an instance through this constructor.
+ */
+ AsyncRpcClient(final Class<?> protocol,
final InetSocketAddress addr)
throws Exception {
@@ -209,7 +213,7 @@ public class AsyncRpcClient extends NettyClientBase {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
- LOG.error(addr + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
+ LOG.error(getRemoteAddress() + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
e.getChannel().close();
for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 9388665..a0963a7 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -54,7 +54,11 @@ public class BlockingRpcClient extends NettyClientBase {
private RpcConnectionKey key;
- public BlockingRpcClient(final Class<?> protocol,
+ /**
+ * Intentionally make this method package-private, avoiding user directly
+ * new an instance through this constructor.
+ */
+ BlockingRpcClient(final Class<?> protocol,
final InetSocketAddress addr)
throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index d22fb5b..79c2674 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -40,11 +40,14 @@ public abstract class NettyClientBase implements Closeable {
protected static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
protected static int nettyWorkerCount;
- private ClientSocketChannelFactory factory;
+ /**
+ * make this factory static thus all clients can share its thread pool.
+ * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+ */
+ private static final ClientSocketChannelFactory factory;
+
protected ClientBootstrap bootstrap;
private ChannelFuture channelFuture;
- private Channel channel;
- protected InetSocketAddress addr;
static {
TajoConf conf = new TajoConf();
@@ -53,6 +56,10 @@ public abstract class NettyClientBase implements Closeable {
if (nettyWorkerCount <= 0) {
nettyWorkerCount = DEFAULT_IO_THREADS;
}
+
+ factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool(),
+ nettyWorkerCount);
}
public NettyClientBase() {
@@ -62,14 +69,7 @@ public abstract class NettyClientBase implements Closeable {
public abstract RpcConnectionPool.RpcConnectionKey getKey();
public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory) throws IOException {
- this.addr = addr;
-
try {
- this.factory =
- new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool(),
- nettyWorkerCount);
-
this.bootstrap = new ClientBootstrap(factory);
this.bootstrap.setPipelineFactory(pipeFactory);
// TODO - should be configurable
@@ -85,7 +85,6 @@ public abstract class NettyClientBase implements Closeable {
channelFuture.getCause().printStackTrace();
throw new RuntimeException(channelFuture.getCause());
}
- this.channel = channelFuture.getChannel();
} catch (Throwable t) {
close();
throw new IOException(t.getCause());
@@ -97,24 +96,25 @@ public abstract class NettyClientBase implements Closeable {
}
public InetSocketAddress getRemoteAddress() {
- return this.addr;
+ return (InetSocketAddress) channelFuture.getChannel().getRemoteAddress();
}
public Channel getChannel() {
- return this.channel;
+ return channelFuture.getChannel();
}
@Override
public void close() {
- if(this.channel != null) {
- this.channel.close().awaitUninterruptibly();
+ if(this.channelFuture != null) {
+ this.channelFuture.getChannel().close();
}
if(this.bootstrap != null) {
- this.bootstrap.releaseExternalResources();
+ // This line will shutdown the factory
+ // this.bootstrap.releaseExternalResources();
if(LOG.isDebugEnabled()) {
LOG.debug("Proxy is disconnected from " +
- addr.getAddress().getHostAddress() + ":" + addr.getPort());
+ getRemoteAddress().getHostName() + ":" + getRemoteAddress().getPort());
}
}
}