You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/21 02:48:54 UTC
git commit: TAJO-522: OutOfMemoryError: unable to create new native
thread. (hyoungjunkim via hyunsik)
Updated Branches:
refs/heads/master 9af2fca12 -> 948915151
TAJO-522: OutOfMemoryError: unable to create new native thread. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/94891515
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/94891515
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/94891515
Branch: refs/heads/master
Commit: 948915151b14774c606bba7a78751600d2368261
Parents: 9af2fca
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Jan 21 10:48:41 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Jan 21 10:48:41 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../java/org/apache/tajo/conf/TajoConf.java | 1 +
.../apache/tajo/master/TajoContainerProxy.java | 27 ++++----------------
tajo-rpc/pom.xml | 4 ---
.../org/apache/tajo/rpc/NettyClientBase.java | 19 ++++++++++++--
5 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 104073f..08a133b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -219,6 +219,9 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-522: OutOfMemoryError: unable to create new native thread.
+ (hyoungjunkim via hyunsik)
+
TAJO-518: tajo-algebra and ProjectionPushDownRule code cleanup. (hyunsik)
TAJO-503: HCatalogStore can't scan several hive databases. (jaehwa)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 4d7254a..f2fc1aa 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -179,6 +179,7 @@ public class TajoConf extends Configuration {
// RPC
//////////////////////////////////
RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
+ RPC_CLIENT_SOCKET_IO_THREADS("tajo.rpc.client.socket-io-threads", 0),
//////////////////////////////////
// The Below is reserved
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index bdedd98..39a73ba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -30,7 +30,6 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.rm.TajoWorkerContainer;
import org.apache.tajo.master.rm.TajoWorkerContainerId;
-import org.apache.tajo.rpc.AsyncRpcClient;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
@@ -62,13 +61,13 @@ public class TajoContainerProxy extends ContainerProxy {
}
private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
- AsyncRpcClient tajoWorkerRpc = null;
+ NettyClientBase tajoWorkerRpc = null;
try {
InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
.getQueryMasterManagerService().getBindAddr();
InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
- tajoWorkerRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
+ tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
TajoWorkerProtocol.RunExecutionBlockRequestProto request =
@@ -84,27 +83,11 @@ public class TajoContainerProxy extends ContainerProxy {
tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
} catch (Exception e) {
//TODO retry
+ RpcConnectionPool.getPool(context.getConf()).closeConnection(tajoWorkerRpc);
+ tajoWorkerRpc = null;
LOG.error(e.getMessage(), e);
} finally {
- if(tajoWorkerRpc != null) {
- (new AyncRpcClose(tajoWorkerRpc)).start();
- }
- }
- }
-
- class AyncRpcClose extends Thread {
- AsyncRpcClient client;
- public AyncRpcClose(AsyncRpcClient client) {
- this.client = client;
- }
-
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- client.close();
+ RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 04cf780..7f7ae79 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -130,10 +130,6 @@
<artifactId>tajo-common</artifactId>
<exclusions>
<exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</exclusion>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index b41bfdd..d22fb5b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -20,6 +20,7 @@ package org.apache.tajo.rpc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@@ -35,12 +36,25 @@ import java.util.concurrent.Executors;
public abstract class NettyClientBase implements Closeable {
private static Log LOG = LogFactory.getLog(NettyClientBase.class);
+ //netty default value
+ protected static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
+ protected static int nettyWorkerCount;
+
private ClientSocketChannelFactory factory;
protected ClientBootstrap bootstrap;
private ChannelFuture channelFuture;
private Channel channel;
protected InetSocketAddress addr;
+ static {
+ TajoConf conf = new TajoConf();
+
+ nettyWorkerCount = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_SOCKET_IO_THREADS);
+ if (nettyWorkerCount <= 0) {
+ nettyWorkerCount = DEFAULT_IO_THREADS;
+ }
+ }
+
public NettyClientBase() {
}
@@ -53,7 +67,8 @@ public abstract class NettyClientBase implements Closeable {
try {
this.factory =
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool());
+ Executors.newCachedThreadPool(),
+ nettyWorkerCount);
this.bootstrap = new ClientBootstrap(factory);
this.bootstrap.setPipelineFactory(pipeFactory);
@@ -61,7 +76,7 @@ public abstract class NettyClientBase implements Closeable {
this.bootstrap.setOption("connectTimeoutMillis", 10000);
this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
this.bootstrap.setOption("receiveBufferSize", 1048576*2);
- this.bootstrap.setOption("tcpNoDelay", false);
+ this.bootstrap.setOption("tcpNoDelay", true);
this.bootstrap.setOption("keepAlive", true);
this.channelFuture = bootstrap.connect(addr);