You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/24 03:59:50 UTC

git commit: TAJO-544: Thread pool abusing. (Min Zhou via hyunsik)

Updated Branches:
  refs/heads/master a6d9cbe60 -> 3125733cc


TAJO-544: Thread pool abusing. (Min Zhou via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3125733c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3125733c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3125733c

Branch: refs/heads/master
Commit: 3125733cc6f2bbce306afd00b8fa8440f07b6e1e
Parents: a6d9cbe
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 24 11:50:52 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 24 11:50:52 2014 +0900

----------------------------------------------------------------------
 .../java/org/apache/tajo/worker/Fetcher.java    | 34 ++++++++++----------
 1 file changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3125733c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
index 76f3049..4f9f3fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -57,7 +57,18 @@ public class Fetcher {
   private long finishTime;
   private long fileLen;
   private int messageReceiveCount;
-  private ChannelFactory factory;
+
+
+  private static final ThreadFactory bossFactory = new ThreadFactoryBuilder()
+      .setNameFormat("Fetcher Netty Boss #%d")
+      .build();
+  private static final ThreadFactory workerFactory = new ThreadFactoryBuilder()
+      .setNameFormat("Fetcher Netty Worker #%d")
+      .build();
+  private static final ChannelFactory factory = new NioClientSocketChannelFactory(
+      Executors.newCachedThreadPool(bossFactory),
+      Executors.newCachedThreadPool(workerFactory));
+
   private ClientBootstrap bootstrap;
 
   public Fetcher(URI uri, File file) {
@@ -75,24 +86,13 @@ public class Fetcher {
       }
     }
 
-    ThreadFactory bossFactory = new ThreadFactoryBuilder()
-        .setNameFormat("Fetcher Netty Boss #%d")
-        .build();
-    ThreadFactory workerFactory = new ThreadFactoryBuilder()
-        .setNameFormat("Fetcher Netty Worker #%d")
-        .build();
-
-    factory = new NioClientSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory));
-
     bootstrap = new ClientBootstrap(factory);
     bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
     bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
     bootstrap.setOption("tcpNoDelay", true);
 
-    ChannelPipelineFactory factory = new HttpClientPipelineFactory(file);
-    bootstrap.setPipelineFactory(factory);
+    ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
+    bootstrap.setPipelineFactory(pipelineFactory);
   }
 
   public long getStartTime() {
@@ -131,7 +131,7 @@ public class Fetcher {
     // Wait until the connection attempt succeeds or fails.
     Channel channel = future.awaitUninterruptibly().getChannel();
     if (!future.isSuccess()) {
-      bootstrap.releaseExternalResources();
+      future.getChannel().close();
       throw new IOException(future.getCause());
     }
 
@@ -153,8 +153,8 @@ public class Fetcher {
 
     channelFuture.addListener(ChannelFutureListener.CLOSE);
 
-    // Shut down executor threads to exit.
-    bootstrap.releaseExternalResources();
+    // Close the channel to exit.
+    future.getChannel().close();
     finishTime = System.currentTimeMillis();
     return file;
   }