You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/24 03:59:50 UTC
git commit: TAJO-544: Thread pool abusing. (Min Zhou via hyunsik)
Updated Branches:
refs/heads/master a6d9cbe60 -> 3125733cc
TAJO-544: Thread pool abusing. (Min Zhou via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3125733c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3125733c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3125733c
Branch: refs/heads/master
Commit: 3125733cc6f2bbce306afd00b8fa8440f07b6e1e
Parents: a6d9cbe
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Jan 24 11:50:52 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Jan 24 11:50:52 2014 +0900
----------------------------------------------------------------------
.../java/org/apache/tajo/worker/Fetcher.java | 34 ++++++++++----------
1 file changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3125733c/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
index 76f3049..4f9f3fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -57,7 +57,18 @@ public class Fetcher {
private long finishTime;
private long fileLen;
private int messageReceiveCount;
- private ChannelFactory factory;
+
+
+ private static final ThreadFactory bossFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Fetcher Netty Boss #%d")
+ .build();
+ private static final ThreadFactory workerFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Fetcher Netty Worker #%d")
+ .build();
+ private static final ChannelFactory factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory));
+
private ClientBootstrap bootstrap;
public Fetcher(URI uri, File file) {
@@ -75,24 +86,13 @@ public class Fetcher {
}
}
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("Fetcher Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("Fetcher Netty Worker #%d")
- .build();
-
- factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
-
bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec
bootstrap.setOption("receiveBufferSize", 1048576); // set 1M
bootstrap.setOption("tcpNoDelay", true);
- ChannelPipelineFactory factory = new HttpClientPipelineFactory(file);
- bootstrap.setPipelineFactory(factory);
+ ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(file);
+ bootstrap.setPipelineFactory(pipelineFactory);
}
public long getStartTime() {
@@ -131,7 +131,7 @@ public class Fetcher {
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
- bootstrap.releaseExternalResources();
+ future.getChannel().close();
throw new IOException(future.getCause());
}
@@ -153,8 +153,8 @@ public class Fetcher {
channelFuture.addListener(ChannelFutureListener.CLOSE);
- // Shut down executor threads to exit.
- bootstrap.releaseExternalResources();
+ // Close the channel to exit.
+ future.getChannel().close();
finishTime = System.currentTimeMillis();
return file;
}