You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/21 02:48:54 UTC

git commit: TAJO-522: OutOfMemoryError: unable to create new native thread. (hyoungjunkim via hyunsik)

Updated Branches:
  refs/heads/master 9af2fca12 -> 948915151


TAJO-522: OutOfMemoryError: unable to create new native thread. (hyoungjunkim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/94891515
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/94891515
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/94891515

Branch: refs/heads/master
Commit: 948915151b14774c606bba7a78751600d2368261
Parents: 9af2fca
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Jan 21 10:48:41 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Jan 21 10:48:41 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +++
 .../java/org/apache/tajo/conf/TajoConf.java     |  1 +
 .../apache/tajo/master/TajoContainerProxy.java  | 27 ++++----------------
 tajo-rpc/pom.xml                                |  4 ---
 .../org/apache/tajo/rpc/NettyClientBase.java    | 19 ++++++++++++--
 5 files changed, 26 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 104073f..08a133b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -219,6 +219,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-522: OutOfMemoryError: unable to create new native thread.
+    (hyoungjunkim via hyunsik)
+
     TAJO-518: tajo-algebra and ProjectionPushDownRule code cleanup. (hyunsik)
 
     TAJO-503: HCatalogStore can't scan several hive databases. (jaehwa)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 4d7254a..f2fc1aa 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -179,6 +179,7 @@ public class TajoConf extends Configuration {
     // RPC
     //////////////////////////////////
     RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
+    RPC_CLIENT_SOCKET_IO_THREADS("tajo.rpc.client.socket-io-threads", 0),
 
     //////////////////////////////////
     // The Below is reserved

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index bdedd98..39a73ba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -30,7 +30,6 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.rm.TajoWorkerContainer;
 import org.apache.tajo.master.rm.TajoWorkerContainerId;
-import org.apache.tajo.rpc.AsyncRpcClient;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -62,13 +61,13 @@ public class TajoContainerProxy extends ContainerProxy {
   }
 
   private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
-    AsyncRpcClient tajoWorkerRpc = null;
+    NettyClientBase tajoWorkerRpc = null;
     try {
       InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
           .getQueryMasterManagerService().getBindAddr();
 
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
-      tajoWorkerRpc = new AsyncRpcClient(TajoWorkerProtocol.class, addr);
+      tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
 
       TajoWorkerProtocol.RunExecutionBlockRequestProto request =
@@ -84,27 +83,11 @@ public class TajoContainerProxy extends ContainerProxy {
       tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
     } catch (Exception e) {
       //TODO retry
+      RpcConnectionPool.getPool(context.getConf()).closeConnection(tajoWorkerRpc);
+      tajoWorkerRpc = null;
       LOG.error(e.getMessage(), e);
     } finally {
-      if(tajoWorkerRpc != null) {
-        (new AyncRpcClose(tajoWorkerRpc)).start();
-      }
-    }
-  }
-
-  class AyncRpcClose extends Thread {
-    AsyncRpcClient client;
-    public AyncRpcClose(AsyncRpcClient client) {
-      this.client = client;
-    }
-
-    @Override
-    public void run() {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-      }
-      client.close();
+      RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-rpc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml
index 04cf780..7f7ae79 100644
--- a/tajo-rpc/pom.xml
+++ b/tajo-rpc/pom.xml
@@ -130,10 +130,6 @@
       <artifactId>tajo-common</artifactId>
       <exclusions>
         <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
-        <exclusion>
           <groupId>joda-time</groupId>
           <artifactId>joda-time</artifactId>
         </exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/94891515/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index b41bfdd..d22fb5b 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -20,6 +20,7 @@ package org.apache.tajo.rpc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
@@ -35,12 +36,25 @@ import java.util.concurrent.Executors;
 public abstract class NettyClientBase implements Closeable {
   private static Log LOG = LogFactory.getLog(NettyClientBase.class);
 
+  //netty default value
+  protected static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
+  protected static int nettyWorkerCount;
+
   private ClientSocketChannelFactory factory;
   protected ClientBootstrap bootstrap;
   private ChannelFuture channelFuture;
   private Channel channel;
   protected InetSocketAddress addr;
 
+  static {
+    TajoConf conf = new TajoConf();
+
+    nettyWorkerCount = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_SOCKET_IO_THREADS);
+    if (nettyWorkerCount <= 0) {
+      nettyWorkerCount = DEFAULT_IO_THREADS;
+    }
+  }
+
   public NettyClientBase() {
   }
 
@@ -53,7 +67,8 @@ public abstract class NettyClientBase implements Closeable {
     try {
       this.factory =
           new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-              Executors.newCachedThreadPool());
+              Executors.newCachedThreadPool(),
+              nettyWorkerCount);
 
       this.bootstrap = new ClientBootstrap(factory);
       this.bootstrap.setPipelineFactory(pipeFactory);
@@ -61,7 +76,7 @@ public abstract class NettyClientBase implements Closeable {
       this.bootstrap.setOption("connectTimeoutMillis", 10000);
       this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
       this.bootstrap.setOption("receiveBufferSize", 1048576*2);
-      this.bootstrap.setOption("tcpNoDelay", false);
+      this.bootstrap.setOption("tcpNoDelay", true);
       this.bootstrap.setOption("keepAlive", true);
 
       this.channelFuture = bootstrap.connect(addr);