You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/21 06:58:01 UTC

git commit: TAJO-537: After TAJO-522, still OutOfMemoryError: unable to create new native thread. (Min Zhou via hyunsik)

Updated Branches:
  refs/heads/master b75ea74d5 -> e57cf426d


TAJO-537: After TAJO-522, still OutOfMemoryError: unable to create new native thread. (Min Zhou  via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e57cf426
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e57cf426
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e57cf426

Branch: refs/heads/master
Commit: e57cf426d8d1d1b924a32a10f81a8912f2b0c45f
Parents: b75ea74
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Jan 21 14:49:56 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Jan 21 14:49:56 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../master/querymaster/QueryInProgress.java     | 13 ++++----
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  8 +++--
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  6 +++-
 .../org/apache/tajo/rpc/NettyClientBase.java    | 34 ++++++++++----------
 5 files changed, 38 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b5b3d81..c19392d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -219,6 +219,9 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-537: After TAJO-522, still OutOfMemoryError: unable to create new
+    native thread. (Min Zhou  via hyunsik)
+
     TAJO-522: OutOfMemoryError: unable to create new native thread.
     (hyoungjunkim via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 92ba2e2..2ab6c6a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -36,8 +37,9 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.master.rm.WorkerResourceManager;
-import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 
 import java.net.InetSocketAddress;
@@ -62,7 +64,7 @@ public class QueryInProgress extends CompositeService {
 
   private final TajoMaster.MasterContext masterContext;
 
-  private AsyncRpcClient queryMasterRpc;
+  private NettyClientBase queryMasterRpc;
 
   private QueryMasterProtocolService queryMasterRpcClient;
 
@@ -127,8 +129,7 @@ public class QueryInProgress extends CompositeService {
     super.stop();
 
     if(queryMasterRpc != null) {
-      //TODO release to connection pool
-      queryMasterRpc.close();
+      RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
     }
   }
 
@@ -185,8 +186,8 @@ public class QueryInProgress extends CompositeService {
       InetSocketAddress addr = NetUtils.createSocketAddr(
           queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
       LOG.info("Connect to QueryMaster:" + addr);
-      //TODO Get Connection from pool
-      queryMasterRpc = new AsyncRpcClient(QueryMasterProtocol.class, addr);
+      queryMasterRpc =
+          RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);
       queryMasterRpcClient = queryMasterRpc.getStub();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index e8031c8..01e1110 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -51,7 +51,11 @@ public class AsyncRpcClient extends NettyClientBase {
 
   private RpcConnectionKey key;
 
-  public AsyncRpcClient(final Class<?> protocol,
+  /**
+   * Intentionally make this method package-private, avoiding user directly
+   * new an instance through this constructor.
+   */
+  AsyncRpcClient(final Class<?> protocol,
                         final InetSocketAddress addr)
       throws Exception {
 
@@ -209,7 +213,7 @@ public class AsyncRpcClient extends NettyClientBase {
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
         throws Exception {
-      LOG.error(addr + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
+      LOG.error(getRemoteAddress() + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
       e.getChannel().close();
 
       for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 9388665..a0963a7 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -54,7 +54,11 @@ public class BlockingRpcClient extends NettyClientBase {
 
   private RpcConnectionKey key;
 
-  public BlockingRpcClient(final Class<?> protocol,
+  /**
+   * Intentionally make this method package-private, avoiding user directly
+   * new an instance through this constructor.
+   */
+  BlockingRpcClient(final Class<?> protocol,
                            final InetSocketAddress addr)
       throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e57cf426/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index d22fb5b..79c2674 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -40,11 +40,14 @@ public abstract class NettyClientBase implements Closeable {
   protected static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
   protected static int nettyWorkerCount;
 
-  private ClientSocketChannelFactory factory;
+  /**
+   * make this factory static thus all clients can share its thread pool.
+   * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+   */
+  private static final ClientSocketChannelFactory factory;
+
   protected ClientBootstrap bootstrap;
   private ChannelFuture channelFuture;
-  private Channel channel;
-  protected InetSocketAddress addr;
 
   static {
     TajoConf conf = new TajoConf();
@@ -53,6 +56,10 @@ public abstract class NettyClientBase implements Closeable {
     if (nettyWorkerCount <= 0) {
       nettyWorkerCount = DEFAULT_IO_THREADS;
     }
+
+    factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+        Executors.newCachedThreadPool(),
+        nettyWorkerCount);
   }
 
   public NettyClientBase() {
@@ -62,14 +69,7 @@ public abstract class NettyClientBase implements Closeable {
   public abstract RpcConnectionPool.RpcConnectionKey getKey();
 
   public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory) throws IOException {
-    this.addr = addr;
-
     try {
-      this.factory =
-          new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-              Executors.newCachedThreadPool(),
-              nettyWorkerCount);
-
       this.bootstrap = new ClientBootstrap(factory);
       this.bootstrap.setPipelineFactory(pipeFactory);
       // TODO - should be configurable
@@ -85,7 +85,6 @@ public abstract class NettyClientBase implements Closeable {
         channelFuture.getCause().printStackTrace();
         throw new RuntimeException(channelFuture.getCause());
       }
-      this.channel = channelFuture.getChannel();
     } catch (Throwable t) {
       close();
       throw new IOException(t.getCause());
@@ -97,24 +96,25 @@ public abstract class NettyClientBase implements Closeable {
   }
 
   public InetSocketAddress getRemoteAddress() {
-    return this.addr;
+    return (InetSocketAddress) channelFuture.getChannel().getRemoteAddress();
   }
 
   public Channel getChannel() {
-    return this.channel;
+    return channelFuture.getChannel();
   }
 
   @Override
   public void close() {
-    if(this.channel != null) {
-      this.channel.close().awaitUninterruptibly();
+    if(this.channelFuture != null) {
+      this.channelFuture.getChannel().close();
     }
 
     if(this.bootstrap != null) {
-      this.bootstrap.releaseExternalResources();
+      // This line will shutdown the factory
+      // this.bootstrap.releaseExternalResources();
       if(LOG.isDebugEnabled()) {
         LOG.debug("Proxy is disconnected from " +
-            addr.getAddress().getHostAddress() + ":" + addr.getPort());
+            getRemoteAddress().getHostName() + ":" + getRemoteAddress().getPort());
       }
     }
   }