You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/02/17 03:51:44 UTC

[1/2] TAJO-598: Refactoring Tajo RPC. (jinho)

Repository: incubator-tajo
Updated Branches:
  refs/heads/master cd7bbae0d -> e2a7dffdb


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 8a0b63a..c31e9cd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -48,10 +48,12 @@ import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.ApplicationIdUtils;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -69,16 +71,16 @@ public class Task {
   private final TajoConf systemConf;
   private final QueryContext queryContext;
   private final FileSystem localFS;
-  private final TaskRunner.TaskRunnerContext taskRunnerContext;
+  private TaskRunner.TaskRunnerContext taskRunnerContext;
   private final QueryMasterProtocolService.Interface masterProxy;
   private final LocalDirAllocator lDirAllocator;
   private final QueryUnitAttemptId taskId;
 
   private final Path taskDir;
   private final QueryUnitRequest request;
-  private final TaskAttemptContext context;
+  private TaskAttemptContext context;
   private List<Fetcher> fetcherRunners;
-  private final LogicalNode plan;
+  private LogicalNode plan;
   private final Map<String, TableDesc> descs = Maps.newHashMap();
   private PhysicalExec executor;
   private boolean interQuery;
@@ -107,6 +109,7 @@ public class Task {
   private ShuffleType shuffleType = null;
   private Schema finalSchema = null;
   private TupleComparator sortComp = null;
+  private ClientSocketChannelFactory channelFactory = null;
 
   static final String OUTPUT_FILE_PREFIX="part-";
   static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
@@ -281,12 +284,14 @@ public class Task {
     context.stop();
     context.setState(TaskAttemptState.TA_KILLED);
     setProgressFlag();
+    releaseChannelFactory();
   }
 
   public void abort() {
     aborted = true;
     context.stop();
     context.setState(TaskAttemptState.TA_FAILED);
+    releaseChannelFactory();
   }
 
   public void cleanUp() {
@@ -294,7 +299,6 @@ public class Task {
 
     if (context.getState() == TaskAttemptState.TA_SUCCEEDED) {
       try {
-        // context.getWorkDir() 지우기
         localFS.delete(context.getWorkDir(), true);
         synchronized (taskRunnerContext.getTasks()) {
           taskRunnerContext.getTasks().remove(this.getId());
@@ -348,6 +352,7 @@ public class Task {
       FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
       context.updateAssignedFragments(inputTable, frags);
     }
+    releaseChannelFactory();
   }
 
   public void run() {
@@ -371,6 +376,7 @@ public class Task {
           ++progress;
         }
         this.executor.close();
+        this.executor = null;
       }
     } catch (Exception e) {
       // errorMessage will be sent to master.
@@ -435,6 +441,13 @@ public class Task {
   public void cleanupTask() {
     taskRunnerContext.addTaskHistory(getId(), getTaskHistory());
     taskRunnerContext.getTasks().remove(getId());
+    taskRunnerContext = null;
+
+    fetcherRunners.clear();
+    executor = null;
+    plan = null;
+    context = null;
+    releaseChannelFactory();
   }
 
   public TaskHistory getTaskHistory() {
@@ -515,7 +528,7 @@ public class Task {
     return tablets;
   }
 
-  private class FetchRunner implements Runnable {
+  private static class FetchRunner implements Runnable {
     private final TaskAttemptContext ctx;
     private final Fetcher fetcher;
 
@@ -538,7 +551,7 @@ public class Task {
             } catch (InterruptedException e) {
               LOG.error(e);
             }
-            LOG.info("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
+            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
           }
           try {
             File fetched = fetcher.get();
@@ -560,10 +573,24 @@ public class Task {
     }
   }
 
+  private void releaseChannelFactory(){
+    if(channelFactory != null) {
+      channelFactory.shutdown();
+      channelFactory.releaseExternalResources();
+      channelFactory = null;
+    }
+  }
+
   private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
                                         List<Fetch> fetches) throws IOException {
 
     if (fetches.size() > 0) {
+
+      releaseChannelFactory();
+
+
+      int workerNum = ctx.getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM);
+      channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum);
       Path inputDir = lDirAllocator.
           getLocalPathToRead(
               getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
@@ -578,7 +605,7 @@ public class Task {
           storeDir.mkdirs();
         }
         storeFile = new File(storeDir, "in_" + i);
-        Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile);
+        Fetcher fetcher = new Fetcher(URI.create(f.getUrls()), storeFile, channelFactory);
         runnerList.add(fetcher);
         i++;
       }
@@ -589,74 +616,74 @@ public class Task {
     }
   }
 
-  protected class Reporter implements Runnable {
+  protected class Reporter {
     private QueryMasterProtocolService.Interface masterStub;
     private Thread pingThread;
-    private Object lock = new Object();
+    private AtomicBoolean stop = new AtomicBoolean(false);
     private static final int PROGRESS_INTERVAL = 3000;
 
     public Reporter(QueryMasterProtocolService.Interface masterStub) {
       this.masterStub = masterStub;
     }
 
-    @Override
-    public void run() {
-      final int MAX_RETRIES = 3;
-      int remainingRetries = MAX_RETRIES;
+    Runnable createReporterThread() {
 
-      while (!stopped) {
-        try {
-          synchronized (lock) {
-            if (stopped) {
-              break;
-            }
-            lock.wait(PROGRESS_INTERVAL);
-          }
-          if (stopped) {
-            break;
-          }
-          resetProgressFlag();
-
-          if (getProgressFlag()) {
-            resetProgressFlag();
-            masterStub.statusUpdate(null, getReport(), NullCallback.get());
-          } else {
-            masterStub.ping(null, taskId.getProto(), NullCallback.get());
-          }
-
-        } catch (Throwable t) {
+      return new Runnable() {
+        final int MAX_RETRIES = 3;
+        int remainingRetries = MAX_RETRIES;
+        @Override
+        public void run() {
+          while (!stop.get() && !stopped) {
+            try {
 
-          LOG.info("Communication exception: " + StringUtils
-              .stringifyException(t));
-          remainingRetries -=1;
-          if (remainingRetries == 0) {
-            ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
-            LOG.warn("Last retry, killing ");
-            System.exit(65);
+              resetProgressFlag();
+
+              if (getProgressFlag()) {
+                resetProgressFlag();
+                masterStub.statusUpdate(null, getReport(), NullCallback.get());
+              } else {
+                masterStub.ping(null, taskId.getProto(), NullCallback.get());
+              }
+              synchronized (pingThread) {
+                pingThread.wait(PROGRESS_INTERVAL);
+              }
+
+            } catch (Throwable t) {
+
+              LOG.info("Communication exception: " + StringUtils
+                  .stringifyException(t));
+              remainingRetries -=1;
+              if (remainingRetries == 0) {
+                ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
+                LOG.warn("Last retry, killing ");
+                System.exit(65);
+              }
+            }
           }
         }
-      }
+      };
     }
 
     public void startCommunicationThread() {
       if (pingThread == null) {
-        pingThread = new Thread(this, "communication thread");
-        pingThread.setDaemon(true);
+        pingThread = new Thread(createReporterThread());
+        pingThread.setName("communication thread");
         pingThread.start();
       }
     }
 
     public void stopCommunicationThread() throws InterruptedException {
+      if(stop.getAndSet(true)){
+        return;
+      }
+
       if (pingThread != null) {
         // Intent of the lock is to not send an interupt in the middle of an
         // umbilical.ping or umbilical.statusUpdate
-        synchronized(lock) {
+        synchronized(pingThread) {
           //Interrupt if sleeping. Otherwise wait for the RPC call to return.
-          lock.notify();
+          pingThread.notifyAll();
         }
-
-        pingThread.interrupt();
-        pingThread.join();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index dab18b5..7b49c15 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -78,12 +78,8 @@ public class TaskRunner extends AbstractService {
 
   private TajoQueryEngine queryEngine;
 
-  // TODO - this should be configurable
-  private final int coreNum = 4;
-
   // for Fetcher
-  private final ExecutorService fetchLauncher =
-      Executors.newFixedThreadPool(coreNum * 4);
+  private final ExecutorService fetchLauncher;
   // It keeps all of the query unit attempts while a TaskRunner is running.
   private final Map<QueryUnitAttemptId, Task> tasks =
       new ConcurrentHashMap<QueryUnitAttemptId, Task>();
@@ -118,7 +114,8 @@ public class TaskRunner extends AbstractService {
 
     this.taskRunnerManager = taskRunnerManager;
     this.connPool = RpcConnectionPool.getPool(conf);
-
+    this.fetchLauncher = Executors.newFixedThreadPool(
+        conf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM));
     try {
       final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
 
@@ -220,6 +217,9 @@ public class TaskRunner extends AbstractService {
       }
     }
 
+    tasks.clear();
+    fetchLauncher.shutdown();
+    this.queryEngine = null;
 //    if(client != null) {
 //      client.close();
 //      client = null;
@@ -352,7 +352,7 @@ public class TaskRunner extends AbstractService {
                 }
                 // if there has been no assigning task for a given period,
                 // TaskRunner will retry to request an assigning task.
-                LOG.warn("Timeout GetTask:" + getId() + ", but retry", te);
+                LOG.info("Retry assigning task:" + getId());
                 continue;
               }
 
@@ -400,8 +400,6 @@ public class TaskRunner extends AbstractService {
                 }
               }
             } catch (Throwable t) {
-              connPool.closeConnection(qmClient);
-              qmClient = null;
               t.printStackTrace();
             } finally {
               connPool.releaseConnection(qmClient);
@@ -410,8 +408,6 @@ public class TaskRunner extends AbstractService {
         }
       });
       taskLauncher.start();
-      taskLauncher.join();
-
     } catch (Throwable t) {
       LOG.fatal("Unhandled exception. Starting shutdown.", t);
     } finally {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index 0489e37..9d7e438 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -18,102 +18,20 @@
 
 package org.apache.tajo.engine.eval;
 
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.apache.tajo.common.TajoDataTypes.Type.*;
 import static org.junit.Assert.*;
 
 public class TestEvalTree extends ExprTestBase{
-  private static TajoTestingCluster util;
-  private static CatalogService cat;
-  private static Tuple [] tuples = new Tuple[3];
-  
-  @BeforeClass
-  public static void setUp() throws Exception {
-    util = new TajoTestingCluster();
-    util.startCatalogCluster();
-    cat = util.getMiniCatalogCluster().getCatalog();
-    for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      cat.createFunction(funcDesc);
-    }
-
-    Schema schema = new Schema();
-    schema.addColumn("name", TEXT);
-    schema.addColumn("score", INT4);
-    schema.addColumn("age", INT4);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-    TableDesc desc = new TableDesc("people", schema, meta, CommonTestingUtil.getTestDir());
-    cat.addTable(desc);
-
-    FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class, FunctionType.GENERAL,
-        CatalogUtil.newSimpleDataType(INT4),
-        CatalogUtil.newSimpleDataTypeArray(INT4, INT4));
-    cat.createFunction(funcMeta);
-    
-    tuples[0] = new VTuple(3);
-    tuples[0].put(new Datum[] {
-        DatumFactory.createText("aabc"),
-        DatumFactory.createInt4(100),
-        DatumFactory.createInt4(10)});
-    tuples[1] = new VTuple(3);
-    tuples[1].put(new Datum[] {
-        DatumFactory.createText("aaba"),
-        DatumFactory.createInt4(200),
-        DatumFactory.createInt4(20)});
-    tuples[2] = new VTuple(3);
-    tuples[2].put(new Datum[] {
-        DatumFactory.createText("kabc"),
-        DatumFactory.createInt4(300),
-        DatumFactory.createInt4(30)});
-  }
-
-  @AfterClass
-  public static void tearDown() throws Exception {
-    util.shutdownCatalogCluster();
-  }
-
-  public static class TestSum extends GeneralFunction {
-    private Integer x;
-    private Integer y;
-
-    public TestSum() {
-      super(new Column[] { new Column("arg1", INT4),
-          new Column("arg2", INT4) });
-    }
-
-    @Override
-    public Datum eval(Tuple params) {
-      x =  params.get(0).asInt4();
-      y =  params.get(1).asInt4();
-      return DatumFactory.createInt4(x + y);
-    }
-  }
-
-  static String[] QUERIES = {
-      "select name, score, age from people where score > 30", // 0
-      "select name, score, age from people where score * age", // 1
-      "select name, score, age from people where test_sum(score * age, 50)", // 2
-      "select 2+3", // 3
-      "select sum(score) from people", // 4
-      "select name from people where NOT (20 > 30)", // 5
-  };
-  
   @Test
   public void testTupleEval() throws CloneNotSupportedException {
     ConstEval e1 = new ConstEval(DatumFactory.createInt4(1));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index c25afc8..d756242 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -27,8 +27,9 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.TestEvalTree.TestSum;
+import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.LogicalPlanner;
@@ -38,6 +39,7 @@ import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -48,6 +50,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.tajo.common.TajoDataTypes.Type.INT4;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -59,7 +62,22 @@ public class TestEvalTreeUtil {
   static EvalNode expr3;
   static SQLAnalyzer analyzer;
   static LogicalPlanner planner;
+  public static class TestSum extends GeneralFunction {
+    private Integer x;
+    private Integer y;
 
+    public TestSum() {
+      super(new Column[] { new Column("arg1", INT4),
+          new Column("arg2", INT4) });
+    }
+
+    @Override
+    public Datum eval(Tuple params) {
+      x =  params.get(0).asInt4();
+      y =  params.get(1).asInt4();
+      return DatumFactory.createInt4(x + y);
+    }
+  }
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -88,9 +106,15 @@ public class TestEvalTreeUtil {
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
 
-    expr1 = getRootSelection(TestEvalTree.QUERIES[0]);
-    expr2 = getRootSelection(TestEvalTree.QUERIES[1]);
-    expr3 = getRootSelection(TestEvalTree.QUERIES[2]);
+    String[] QUERIES = {
+        "select name, score, age from people where score > 30", // 0
+        "select name, score, age from people where score * age", // 1
+        "select name, score, age from people where test_sum(score * age, 50)", // 2
+    };
+
+    expr1 = getRootSelection(QUERIES[0]);
+    expr2 = getRootSelection(QUERIES[1]);
+    expr3 = getRootSelection(QUERIES[2]);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index 3cf69a7..2d3124d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -36,6 +36,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -87,6 +88,11 @@ public class TestGlobalPlanner {
         StorageManagerFactory.getStorageManager(util.getConfiguration()));
   }
 
+  @AfterClass
+  public static void tearDown() {
+    util.shutdownCatalogCluster();
+  }
+
   private MasterPlan buildPlan(String sql) throws PlanningException, IOException {
     Expr expr = sqlAnalyzer.parse(sql);
     LogicalPlan plan = planner.createPlan(expr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
index 0181889..aa4a20c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -22,13 +22,15 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.dataserver.HttpDataServer;
 import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
 import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -70,7 +72,8 @@ public class TestFetcher {
     InetSocketAddress addr = server.getBindAddress();
     
     URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
-    Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"));
+    ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+    Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory);
     fetcher.get();
     server.stop();
     

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index da34ac7..a427635 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.pullserver;
 
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +44,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.retriever.FileChunk;
+import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
@@ -54,7 +54,6 @@ import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.codec.frame.TooLongFrameException;
 import org.jboss.netty.handler.codec.http.*;
 import org.jboss.netty.handler.ssl.SslHandler;
@@ -73,8 +72,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -116,7 +113,7 @@ public class TajoPullServerService extends AbstractService {
 
   private static final Map<String,String> userRsrc =
     new ConcurrentHashMap<String,String>();
-  private static String userName;
+  private String userName;
 
   public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
     "tajo.pullserver.ssl.file.buffer.size";
@@ -194,7 +191,7 @@ public class TajoPullServerService extends AbstractService {
   }
 
   @Override
-  public synchronized void init(Configuration conf) {
+  public void init(Configuration conf) {
     try {
       manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
           DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
@@ -202,16 +199,10 @@ public class TajoPullServerService extends AbstractService {
       readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
           DEFAULT_SHUFFLE_READAHEAD_BYTES);
 
-      ThreadFactory bossFactory = new ThreadFactoryBuilder()
-          .setNameFormat("PullServerAuxService Netty Boss #%d")
-          .build();
-      ThreadFactory workerFactory = new ThreadFactoryBuilder()
-          .setNameFormat("PullServerAuxService Netty Worker #%d")
-          .build();
+      int workerNum = conf.getInt("tajo.shuffle.rpc.server.io-thread-num",
+          Runtime.getRuntime().availableProcessors() * 2);
 
-      selector = new NioServerSocketChannelFactory(
-          Executors.newCachedThreadPool(bossFactory),
-          Executors.newCachedThreadPool(workerFactory));
+      selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum);
 
       localFS = new LocalFileSystem();
       super.init(new Configuration(conf));
@@ -228,7 +219,6 @@ public class TajoPullServerService extends AbstractService {
   public synchronized void start() {
     Configuration conf = getConfig();
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    bootstrap.setOption("tcpNoDelay", true);
 
     try {
       pipelineFact = new HttpPipelineFactory(conf);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index 01e1110..c84d6b6 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -26,6 +26,7 @@ import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -36,7 +37,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey;
 
 public class AsyncRpcClient extends NettyClientBase {
-  private static final Log LOG = LogFactory.getLog(RpcProtos.class);
+  private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class);
 
   private final ChannelUpstreamHandler handler;
   private final ChannelPipelineFactory pipeFactory;
@@ -56,7 +57,12 @@ public class AsyncRpcClient extends NettyClientBase {
    * new an instance through this constructor.
    */
   AsyncRpcClient(final Class<?> protocol,
-                        final InetSocketAddress addr)
+                 final InetSocketAddress addr) throws Exception {
+    this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
+  }
+
+  AsyncRpcClient(final Class<?> protocol,
+                        final InetSocketAddress addr, ClientSocketChannelFactory factory)
       throws Exception {
 
     this.protocol = protocol;
@@ -68,8 +74,8 @@ public class AsyncRpcClient extends NettyClientBase {
     this.handler = new ClientChannelUpstreamHandler();
     pipeFactory = new ProtoPipelineFactory(handler,
         RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory);
-    rpcChannel = new ProxyRpcChannel(getChannel());
+    super.init(addr, pipeFactory, factory);
+    rpcChannel = new ProxyRpcChannel();
     this.key = new RpcConnectionKey(addr, protocol, true);
   }
 
@@ -92,12 +98,10 @@ public class AsyncRpcClient extends NettyClientBase {
   }
 
   private class ProxyRpcChannel implements RpcChannel {
-    private final Channel channel;
     private final ClientChannelUpstreamHandler handler;
 
-    public ProxyRpcChannel(Channel channel) {
-      this.channel = channel;
-      this.handler = channel.getPipeline()
+    public ProxyRpcChannel() {
+      this.handler = getChannel().getPipeline()
           .get(ClientChannelUpstreamHandler.class);
 
       if (handler == null) {
@@ -119,7 +123,7 @@ public class AsyncRpcClient extends NettyClientBase {
       handler.registerCallback(nextSeqId,
           new ResponseCallback(controller, responseType, done));
 
-      channel.write(rpcRequest);
+      getChannel().write(rpcRequest);
     }
 
     private Message buildRequest(int seqId,
@@ -214,7 +218,6 @@ public class AsyncRpcClient extends NettyClientBase {
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
         throws Exception {
       LOG.error(getRemoteAddress() + "," + protocol + "," + e.getCause().getMessage(), e.getCause());
-      e.getChannel().close();
 
       for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) {
         ResponseCallback callback = callbackEntry.getValue();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index 4dc1d05..b7e3cb6 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -40,7 +40,8 @@ public class AsyncRpcServer extends NettyServerBase {
 
   public AsyncRpcServer(final Class<?> protocol,
                         final Object instance,
-                        final InetSocketAddress bindAddress)
+                        final InetSocketAddress bindAddress,
+                        final int workerNum)
       throws Exception {
     super(protocol.getSimpleName(), bindAddress);
 
@@ -54,12 +55,23 @@ public class AsyncRpcServer extends NettyServerBase {
     ServerHandler handler = new ServerHandler();
     this.pipeline = new ProtoPipelineFactory(handler,
         RpcRequest.getDefaultInstance());
-    super.init(this.pipeline);
+    super.init(this.pipeline, workerNum);
   }
 
   private class ServerHandler extends SimpleChannelUpstreamHandler {
 
     @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+        throws Exception {
+
+      accepted.add(evt.getChannel());
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+      }
+      super.channelOpen(ctx, evt);
+    }
+
+    @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
         throws Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index a0963a7..604ed52 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -29,6 +29,7 @@ import org.apache.tajo.rpc.RpcProtos.RpcRequest;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
@@ -59,7 +60,12 @@ public class BlockingRpcClient extends NettyClientBase {
    * new an instance through this constructor.
    */
   BlockingRpcClient(final Class<?> protocol,
-                           final InetSocketAddress addr)
+                 final InetSocketAddress addr) throws Exception {
+    this(protocol, addr, RpcChannelFactory.getSharedClientChannelFactory());
+  }
+
+  BlockingRpcClient(final Class<?> protocol,
+                           final InetSocketAddress addr, ClientSocketChannelFactory factory)
       throws Exception {
 
     this.protocol = protocol;
@@ -72,8 +78,8 @@ public class BlockingRpcClient extends NettyClientBase {
     this.handler = new ClientChannelUpstreamHandler();
     pipeFactory = new ProtoPipelineFactory(handler,
         RpcResponse.getDefaultInstance());
-    super.init(addr, pipeFactory);
-    rpcChannel = new ProxyRpcChannel(getChannel());
+    super.init(addr, pipeFactory, factory);
+    rpcChannel = new ProxyRpcChannel();
 
     this.key = new RpcConnectionKey(addr, protocol, false);
   }
@@ -97,12 +103,12 @@ public class BlockingRpcClient extends NettyClientBase {
   }
 
   private class ProxyRpcChannel implements BlockingRpcChannel {
-    private final Channel channel;
+
     private final ClientChannelUpstreamHandler handler;
 
-    public ProxyRpcChannel(Channel channel) {
-      this.channel = channel;
-      this.handler = channel.getPipeline().
+    public ProxyRpcChannel() {
+
+      this.handler = getChannel().getPipeline().
           get(ClientChannelUpstreamHandler.class);
 
       if (handler == null) {
@@ -124,7 +130,7 @@ public class BlockingRpcClient extends NettyClientBase {
       ProtoCallFuture callFuture =
           new ProtoCallFuture(controller, responsePrototype);
       requests.put(nextSeqId, callFuture);
-      channel.write(rpcRequest);
+      getChannel().write(rpcRequest);
 
       try {
         return callFuture.get();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 3649c5e..067d824 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -38,7 +38,8 @@ public class BlockingRpcServer extends NettyServerBase {
 
   public BlockingRpcServer(final Class<?> protocol,
                            final Object instance,
-                           final InetSocketAddress bindAddress)
+                           final InetSocketAddress bindAddress,
+                           final int workerNum)
       throws Exception {
 
     super(protocol.getSimpleName(), bindAddress);
@@ -55,12 +56,23 @@ public class BlockingRpcServer extends NettyServerBase {
     this.pipeline = new ProtoPipelineFactory(new ServerHandler(),
         RpcRequest.getDefaultInstance());
 
-    super.init(this.pipeline);
+    super.init(this.pipeline, workerNum);
   }
 
   private class ServerHandler extends SimpleChannelUpstreamHandler {
 
     @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+        throws Exception {
+
+      accepted.add(evt.getChannel());
+      if(LOG.isDebugEnabled()){
+        LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size()));
+      }
+      super.channelOpen(ctx, evt);
+    }
+
+    @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
         throws Exception {
       final RpcRequest request = (RpcRequest) e.getMessage();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 79c2674..8373c37 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -20,77 +20,56 @@ package org.apache.tajo.rpc;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.conf.TajoConf;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
 
 public abstract class NettyClientBase implements Closeable {
   private static Log LOG = LogFactory.getLog(NettyClientBase.class);
 
-  //netty default value
-  protected static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
-  protected static int nettyWorkerCount;
-
-  /**
-   * make this factory static thus all clients can share its thread pool.
-   * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
-   */
-  private static final ClientSocketChannelFactory factory;
-
   protected ClientBootstrap bootstrap;
   private ChannelFuture channelFuture;
 
-  static {
-    TajoConf conf = new TajoConf();
-
-    nettyWorkerCount = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_SOCKET_IO_THREADS);
-    if (nettyWorkerCount <= 0) {
-      nettyWorkerCount = DEFAULT_IO_THREADS;
-    }
-
-    factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-        Executors.newCachedThreadPool(),
-        nettyWorkerCount);
-  }
-
   public NettyClientBase() {
   }
 
   public abstract <T> T getStub();
   public abstract RpcConnectionPool.RpcConnectionKey getKey();
 
-  public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory) throws IOException {
+  public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory)
+      throws IOException {
     try {
       this.bootstrap = new ClientBootstrap(factory);
       this.bootstrap.setPipelineFactory(pipeFactory);
       // TODO - should be configurable
       this.bootstrap.setOption("connectTimeoutMillis", 10000);
       this.bootstrap.setOption("connectResponseTimeoutMillis", 10000);
-      this.bootstrap.setOption("receiveBufferSize", 1048576*2);
+      this.bootstrap.setOption("receiveBufferSize", 1048576 * 10);
       this.bootstrap.setOption("tcpNoDelay", true);
       this.bootstrap.setOption("keepAlive", true);
 
-      this.channelFuture = bootstrap.connect(addr);
-      this.channelFuture.awaitUninterruptibly();
-      if (!channelFuture.isSuccess()) {
-        channelFuture.getCause().printStackTrace();
-        throw new RuntimeException(channelFuture.getCause());
-      }
+      connect(addr);
     } catch (Throwable t) {
       close();
       throw new IOException(t.getCause());
     }
   }
 
+  public void connect(InetSocketAddress addr) {
+    this.channelFuture = bootstrap.connect(addr);
+    this.channelFuture.awaitUninterruptibly();
+    if (!channelFuture.isSuccess()) {
+      channelFuture.getCause().printStackTrace();
+      throw new RuntimeException(channelFuture.getCause());
+    }
+  }
+
   public boolean isConnected() {
     return getChannel().isConnected();
   }
@@ -105,8 +84,12 @@ public abstract class NettyClientBase implements Closeable {
 
   @Override
   public void close() {
-    if(this.channelFuture != null) {
-      this.channelFuture.getChannel().close();
+    if(this.channelFuture != null && getChannel().isOpen()) {
+      try {
+        getChannel().close().awaitUninterruptibly();
+      } catch (Throwable ce) {
+        LOG.warn(ce);
+      }
     }
 
     if(this.bootstrap != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index 154fb9b..371f879 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -25,14 +25,15 @@ import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
 
 import java.io.IOException;
 import java.net.DatagramSocket;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.Random;
-import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class NettyServerBase {
@@ -43,10 +44,10 @@ public class NettyServerBase {
   protected String serviceName;
   protected InetSocketAddress serverAddr;
   protected InetSocketAddress bindAddress;
-  protected ChannelFactory factory;
   protected ChannelPipelineFactory pipelineFactory;
   protected ServerBootstrap bootstrap;
   protected Channel channel;
+  protected ChannelGroup accepted = new DefaultChannelGroup();
 
   private InetSocketAddress initIsa;
 
@@ -63,21 +64,19 @@ public class NettyServerBase {
     this.serviceName = name;
   }
 
-  public void init(ChannelPipelineFactory pipeline) {
-    this.factory =
-        new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
-            Executors.newCachedThreadPool());
+  public void init(ChannelPipelineFactory pipeline, int workerNum) {
+    ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
 
     pipelineFactory = pipeline;
     bootstrap = new ServerBootstrap(factory);
     bootstrap.setPipelineFactory(pipelineFactory);
     // TODO - should be configurable
     bootstrap.setOption("reuseAddress", true);
-    bootstrap.setOption("child.tcpNoDelay", false);
+    bootstrap.setOption("child.tcpNoDelay", true);
     bootstrap.setOption("child.keepAlive", true);
     bootstrap.setOption("child.connectTimeoutMillis", 10000);
     bootstrap.setOption("child.connectResponseTimeoutMillis", 10000);
-    bootstrap.setOption("child.receiveBufferSize", 1048576 * 2);
+    bootstrap.setOption("child.receiveBufferSize", 1048576 * 10);
   }
 
   public InetSocketAddress getListenAddress() {
@@ -114,9 +113,16 @@ public class NettyServerBase {
     if(channel != null) {
       channel.close().awaitUninterruptibly();
     }
-    if(factory != null) {
-      factory.releaseExternalResources();
+
+    try {
+      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
     }
+    if(bootstrap != null) {
+      bootstrap.releaseExternalResources();
+    }
+
     LOG.info("Rpc (" + serviceName + ") listened on "
         + NetUtils.normalizeInetSocketAddress(bindAddress)+ ") shutdown");
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
new file mode 100644
index 0000000..adafd5c
--- /dev/null
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.*;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.ThreadNameDeterminer;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class RpcChannelFactory {
+  private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
+  private static ClientSocketChannelFactory factory;
+  private static AtomicInteger clientCount = new AtomicInteger(0);
+  private static AtomicInteger serverCount = new AtomicInteger(0);
+
+  private RpcChannelFactory(){
+  }
+
+  /**
+   * make this factory static thus all clients can share its thread pool.
+   * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
+   */
+  public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(){
+    //shared woker and boss pool
+    if(factory == null){
+      TajoConf conf = new TajoConf();
+      int workerNum = conf.getIntVar(TajoConf.ConfVars.INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM);
+      factory = createClientChannelFactory("Internal-Client", workerNum);
+    }
+    return factory;
+  }
+
+  // Client must release the external resources
+  public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) {
+    name = name + "-" + clientCount.incrementAndGet();
+    if(LOG.isDebugEnabled()){
+      LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum);
+    }
+
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build();
+    ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build();
+
+    NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1,
+        new HashedWheelTimer(), ThreadNameDeterminer.CURRENT);
+    NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum,
+        ThreadNameDeterminer.CURRENT);
+
+    return new NioClientSocketChannelFactory(bossPool, workerPool);
+  }
+
+  // Client must release the external resources
+  public static synchronized ServerSocketChannelFactory createServerChannelFactory(String name, int workerNum) {
+    name = name + "-" + serverCount.incrementAndGet();
+    if(LOG.isInfoEnabled()){
+      LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
+    }
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
+    ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
+
+    NioServerBossPool bossPool =
+        new NioServerBossPool(Executors.newCachedThreadPool(bossFactory), 1, ThreadNameDeterminer.CURRENT);
+    NioWorkerPool workerPool =
+        new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, ThreadNameDeterminer.CURRENT);
+
+    return new NioServerSocketChannelFactory(bossPool, workerPool);
+  }
+
+  public static synchronized void shutdown(){
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Shutdown Shared RPC Pool");
+    }
+    factory.releaseExternalResources();
+    factory = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index dee42d1..777281f 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -18,55 +18,95 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.common.base.Objects;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.TUtil;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 
 import java.net.InetSocketAddress;
-import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
 public class RpcConnectionPool {
   private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class);
 
-  private Map<RpcConnectionKey, NettyClientBase> connections = TUtil.newConcurrentHashMap();
+  private ConcurrentMap<RpcConnectionKey, NettyClientBase> connections =
+      new ConcurrentHashMap<RpcConnectionKey, NettyClientBase>();
+  private ChannelGroup accepted = new DefaultChannelGroup();
 
   private static RpcConnectionPool instance;
+  private final ClientSocketChannelFactory channelFactory;
+  private final TajoConf conf;
 
-  private TajoConf conf;
-
-  private RpcConnectionPool(TajoConf conf) {
+  private RpcConnectionPool(TajoConf conf, ClientSocketChannelFactory channelFactory) {
     this.conf = conf;
+    this.channelFactory =  channelFactory;
   }
 
   public synchronized static RpcConnectionPool getPool(TajoConf conf) {
     if(instance == null) {
-      instance = new RpcConnectionPool(conf);
+      instance = new RpcConnectionPool(conf, RpcChannelFactory.getSharedClientChannelFactory());
     }
-
     return instance;
   }
 
+  public synchronized static RpcConnectionPool newPool(TajoConf conf, String poolName, int workerNum) {
+    return new RpcConnectionPool(conf, RpcChannelFactory.createClientChannelFactory(poolName, workerNum));
+  }
+
   private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) throws Exception {
+    NettyClientBase client;
     if(rpcConnectionKey.asyncMode) {
-      return new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr);
+      client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
     } else {
-      return new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr);
+      client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory);
     }
+    accepted.add(client.getChannel());
+    return client;
   }
 
   public NettyClientBase getConnection(InetSocketAddress addr,
-      Class protocolClass, boolean asyncMode) throws Exception {
+                                       Class protocolClass, boolean asyncMode) throws Exception {
     RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode);
-    synchronized(connections) {
-      if(!connections.containsKey(key)) {
-        connections.put(key, makeConnection(key));
+    NettyClientBase client = connections.get(key);
+
+    if (client == null) {
+      client = makeConnection(key);
+      boolean added = connections.putIfAbsent(key, client) == null;
+
+      if (!added) {
+        client.close();
+        client = connections.get(key);
       }
-      return connections.get(key);
     }
+
+    if (!client.getChannel().isOpen() || !client.getChannel().isConnected()) {
+      LOG.warn("Try to reconnect : " + addr);
+      client.connect(addr);
+    }
+    return client;
   }
 
   public void releaseConnection(NettyClientBase client) {
+    if (client == null) return;
+
+    try {
+      if (!client.getChannel().isOpen()) {
+        connections.remove(client.getKey());
+        client.close();
+      }
+
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Current Connections [" + connections.size() + "] Accepted: " + accepted.size());
+
+      }
+    } catch (Exception e) {
+      LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
+    }
   }
 
   public void closeConnection(NettyClientBase client) {
@@ -76,12 +116,12 @@ public class RpcConnectionPool {
 
     try {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("CloseConnection [" + client.getKey() + "]");
-      }
-      synchronized(connections) {
-        connections.remove(client.getKey());
+        LOG.debug("Close connection [" + client.getKey() + "]");
       }
+
+      connections.remove(client.getKey());
       client.close();
+
     } catch (Exception e) {
       LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
     }
@@ -99,7 +139,20 @@ public class RpcConnectionPool {
           LOG.error("close client pool error", e);
         }
       }
-      connections.clear();
+    }
+
+    connections.clear();
+    try {
+      accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
+    } catch (Throwable t) {
+      LOG.error(t);
+    }
+  }
+
+  public synchronized void shutdown(){
+    close();
+    if(channelFactory != null){
+      channelFactory.releaseExternalResources();
     }
   }
 
@@ -117,7 +170,7 @@ public class RpcConnectionPool {
 
     @Override
     public String toString() {
-      return protocolClass + "," + addr + "," + asyncMode;
+      return "["+ protocolClass + "] " + addr + "," + asyncMode;
     }
 
     @Override
@@ -131,7 +184,7 @@ public class RpcConnectionPool {
 
     @Override
     public int hashCode() {
-      return toString().hashCode();
+      return Objects.hashCode(addr, asyncMode);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 83914e5..97bc28d 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -38,7 +38,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
 
 public class TestAsyncRpc {
   private static Log LOG = LogFactory.getLog(TestAsyncRpc.class);
@@ -56,7 +55,7 @@ public class TestAsyncRpc {
   public void setUp() throws Exception {
     service = new DummyProtocolAsyncImpl();
     server = new AsyncRpcServer(DummyProtocol.class,
-        service, new InetSocketAddress("127.0.0.1", 0));
+        service, new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
     client = new AsyncRpcClient(DummyProtocol.class,
         NetUtils.getConnectAddress(server.getListenAddress()));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 6cd5f25..ba2b919 100644
--- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -48,7 +48,7 @@ public class TestBlockingRpc {
   public void setUp() throws Exception {
     service = new DummyProtocolBlockingImpl();
     server = new BlockingRpcServer(DummyProtocol.class, service,
-        new InetSocketAddress("127.0.0.1", 0));
+        new InetSocketAddress("127.0.0.1", 0), 2);
     server.start();
     client = new BlockingRpcClient(DummyProtocol.class,
         NetUtils.getConnectAddress(server.getListenAddress()));


[2/2] git commit: TAJO-598: Refactoring Tajo RPC. (jinho)

Posted by jh...@apache.org.
TAJO-598: Refactoring Tajo RPC. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e2a7dffd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e2a7dffd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e2a7dffd

Branch: refs/heads/master
Commit: e2a7dffdb652c77dd35a917134cef15b3d54d274
Parents: cd7bbae
Author: jinossy <ji...@gmail.com>
Authored: Mon Feb 17 11:50:38 2014 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Feb 17 11:50:38 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tajo/catalog/CatalogServer.java  |   7 +-
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  12 +-
 .../java/org/apache/tajo/client/TajoClient.java |  17 +--
 .../java/org/apache/tajo/conf/TajoConf.java     |  27 +++-
 .../org/apache/tajo/benchmark/SimpleQuery.java  |   2 +-
 .../java/org/apache/tajo/benchmark/TPCH.java    |  16 +--
 .../planner/physical/AggregationExec.java       |   1 +
 .../engine/planner/physical/BNLJoinExec.java    |  25 +++-
 .../planner/physical/BSTIndexScanExec.java      |  12 +-
 .../planner/physical/BinaryPhysicalExec.java    |   8 +-
 .../planner/physical/ExternalSortExec.java      |  31 +++--
 .../planner/physical/HashAggregateExec.java     |   2 +
 .../planner/physical/HashFullOuterJoinExec.java |   8 ++
 .../engine/planner/physical/HashJoinExec.java   |  11 +-
 .../planner/physical/HashLeftOuterJoinExec.java |  10 +-
 .../physical/HashShuffleFileWriteExec.java      |  18 ++-
 .../engine/planner/physical/MemSortExec.java    |  11 +-
 .../physical/MergeFullOuterJoinExec.java        |  18 ++-
 .../engine/planner/physical/MergeJoinExec.java  |  20 ++-
 .../engine/planner/physical/ProjectionExec.java |   8 +-
 .../physical/RangeShuffleFileWriteExec.java     |   7 +-
 .../physical/RightOuterMergeJoinExec.java       |  18 ++-
 .../engine/planner/physical/SeqScanExec.java    |   9 +-
 .../engine/planner/physical/StoreTableExec.java |  16 ++-
 .../planner/physical/UnaryPhysicalExec.java     |   1 +
 .../tajo/master/DefaultTaskScheduler.java       |  12 +-
 .../apache/tajo/master/LazyTaskScheduler.java   |   4 +-
 .../apache/tajo/master/TajoContainerProxy.java  |   5 -
 .../java/org/apache/tajo/master/TajoMaster.java |   6 +
 .../tajo/master/TajoMasterClientService.java    |   5 +-
 .../apache/tajo/master/TajoMasterService.java   |   3 +-
 .../apache/tajo/master/YarnContainerProxy.java  |  10 +-
 .../tajo/master/YarnTaskRunnerLauncherImpl.java |   2 +-
 .../master/querymaster/QueryInProgress.java     |  16 +--
 .../tajo/master/querymaster/QueryMaster.java    |  22 +---
 .../querymaster/QueryMasterManagerService.java  |   8 +-
 .../master/querymaster/QueryMasterTask.java     |   8 +-
 .../master/rm/TajoWorkerResourceManager.java    |   4 +-
 .../tajo/master/rm/YarnTajoResourceManager.java |   2 +-
 .../reporter/MetricsFileScheduledReporter.java  |   1 +
 .../reporter/TajoMetricsScheduledReporter.java  |   2 +-
 .../java/org/apache/tajo/worker/Fetcher.java    |  18 +--
 .../tajo/worker/TajoResourceAllocator.java      |  15 ++-
 .../java/org/apache/tajo/worker/TajoWorker.java |  24 ++--
 .../tajo/worker/TajoWorkerClientService.java    |  10 +-
 .../tajo/worker/TajoWorkerManagerService.java   |   5 +-
 .../main/java/org/apache/tajo/worker/Task.java  | 125 +++++++++++--------
 .../java/org/apache/tajo/worker/TaskRunner.java |  18 ++-
 .../apache/tajo/engine/eval/TestEvalTree.java   |  86 +------------
 .../tajo/engine/eval/TestEvalTreeUtil.java      |  32 ++++-
 .../apache/tajo/master/TestGlobalPlanner.java   |   6 +
 .../org/apache/tajo/worker/TestFetcher.java     |   9 +-
 .../tajo/pullserver/TajoPullServerService.java  |  22 +---
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  23 ++--
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |  16 ++-
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  22 ++--
 .../org/apache/tajo/rpc/BlockingRpcServer.java  |  16 ++-
 .../org/apache/tajo/rpc/NettyClientBase.java    |  55 +++-----
 .../org/apache/tajo/rpc/NettyServerBase.java    |  28 +++--
 .../org/apache/tajo/rpc/RpcChannelFactory.java  | 102 +++++++++++++++
 .../org/apache/tajo/rpc/RpcConnectionPool.java  |  97 ++++++++++----
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |   3 +-
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |   2 +-
 64 files changed, 735 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2ef8284..9e21181 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-598: Refactoring Tajo RPC. (jinho)
+
     TAJO-592: HCatalogStore should supports RCFile and default hive field delimiter. (jaehwa)
 
     TAJO-548: Investigate frequent young gc. (Min Zhou via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 62d6e27..cf13a9d 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -149,10 +149,9 @@ public class CatalogServer extends AbstractService {
   public void start() {
     String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
+    int workerNum = conf.getIntVar(ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM);
     try {
-      this.rpcServer = new BlockingRpcServer(
-          CatalogProtocol.class,
-          handler, initIsa);
+      this.rpcServer = new BlockingRpcServer(CatalogProtocol.class, handler, initIsa, workerNum);
       this.rpcServer.start();
 
       this.bindAddress = NetUtils.getConnectAddress(this.rpcServer.getListenAddress());
@@ -175,7 +174,7 @@ public class CatalogServer extends AbstractService {
     try {
       store.close();
     } catch (IOException ioe) {
-      LOG.error(ioe);
+      LOG.error(ioe.getMessage(), ioe);
     }
     super.stop();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index be4be3f..57a7294 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -202,8 +202,8 @@ public class TajoCli {
         continue;
 
       } else if (line.charAt(0) == '\\') { // command mode
-        executeCommand(line);
         ((PersistentHistory)reader.getHistory()).flush();
+        executeCommand(line);
 
       } else if (line.endsWith(";") && !line.endsWith("\\;")) {
 
@@ -330,11 +330,14 @@ public class TajoCli {
     try {
 
       QueryStatus status;
+      int initRetries = 0;
+      int progressRetries = 0;
       while (true) {
-        // TODO - configurable
-        Thread.sleep(1000);
+        // TODO - configurabl
         status = client.getQueryStatus(queryId);
         if(status.getState() == QueryState.QUERY_MASTER_INIT || status.getState() == QueryState.QUERY_MASTER_LAUNCHED) {
+          Thread.sleep(Math.min(20 * initRetries, 1000));
+          initRetries++;
           continue;
         }
 
@@ -347,6 +350,9 @@ public class TajoCli {
 
         if (status.getState() != QueryState.QUERY_RUNNING && status.getState() != QueryState.QUERY_NOT_ASSIGNED) {
           break;
+        } else {
+          Thread.sleep(Math.min(200 * progressRetries, 1000));
+          progressRetries += 2;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 2846398..3aeb40e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -73,8 +73,9 @@ public class TajoClient {
     this.conf = conf;
     this.conf.set("tajo.disk.scheduler.report.interval", "0");
     this.tajoMasterAddr = addr;
-
-    connPool = RpcConnectionPool.getPool(conf);
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
+    //Don't share connection pool per client
+    connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
   }
 
   public TajoClient(InetSocketAddress addr) throws IOException {
@@ -87,7 +88,7 @@ public class TajoClient {
 
   public void close() {
     if(connPool != null) {
-      connPool.close();
+      connPool.shutdown();
     }
     queryMasterMap.clear();
   }
@@ -193,7 +194,7 @@ public class TajoClient {
       } catch (Exception e) {
         throw new ServiceException(e.getMessage(), e);
       } finally {
-        connPool.closeConnection(qmClient);
+        connPool.releaseConnection(qmClient);
       }
     } else {
       NettyClientBase tmClient = null;
@@ -217,13 +218,13 @@ public class TajoClient {
           } catch (Exception e) {
             throw new ServiceException(e.getMessage(), e);
           } finally {
-            connPool.closeConnection(qmClient);
+            connPool.releaseConnection(qmClient);
           }
         }
       } catch (Exception e) {
         throw new ServiceException(e.getMessage(), e);
       } finally {
-        connPool.closeConnection(tmClient);
+        connPool.releaseConnection(tmClient);
       }
     }
     return new QueryStatus(res);
@@ -307,7 +308,7 @@ public class TajoClient {
     } catch (Exception e) {
       throw new ServiceException(e.getMessage(), e);
     } finally {
-      connPool.closeConnection(client);
+      connPool.releaseConnection(client);
     }
   }
 
@@ -490,7 +491,7 @@ public class TajoClient {
       LOG.debug("Error when checking for application status", e);
       return false;
     } finally {
-      connPool.closeConnection(tmClient);
+      connPool.releaseConnection(tmClient);
     }
 
     return true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 7c82d72..25caf13 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -137,6 +137,7 @@ public class TajoConf extends Configuration {
     PULLSERVER_PORT("tajo.pullserver.port", 0),
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
+    SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2),
 
     //////////////////////////////////
     // Storage Configuration
@@ -184,7 +185,31 @@ public class TajoConf extends Configuration {
     // RPC
     //////////////////////////////////
     RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
-    RPC_CLIENT_SOCKET_IO_THREADS("tajo.rpc.client.socket-io-threads", 0),
+
+    //Internal RPC Client
+    INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 2),
+
+    //Internal RPC Server
+    MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 2),
+    QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM("tajo.querymaster.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 2),
+    WORKER_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 2),
+    CATALOG_RPC_SERVER_WORKER_THREAD_NUM("tajo.catalog.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 2),
+    SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM("tajo.shuffle.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 2),
+
+    // Client RPC
+    RPC_CLIENT_WORKER_THREAD_NUM("tajo.rpc.client.worker-thread-num", 4),
+
+    //Client service RPC Server
+    MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.master.service.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 1),
+    WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM("tajo.worker.service.rpc.server.worker-thread-num",
+        Runtime.getRuntime().availableProcessors() * 1),
 
     //////////////////////////////////
     // The Below is reserved

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
index fb8fe5f..af6af8d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/SimpleQuery.java
@@ -21,7 +21,7 @@ package org.apache.tajo.benchmark;
 import java.io.IOException;
 
 public class SimpleQuery extends TPCH {
-  private final String BENCHMARK_DIR = "benchmark/simple";
+  private static final String BENCHMARK_DIR = "benchmark/simple";
 
   public void loadQueries() throws IOException {
     loadQueries(BENCHMARK_DIR);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
index d11941d..2e12b1d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -38,14 +38,14 @@ public class TPCH extends BenchmarkSet {
   private final Log LOG = LogFactory.getLog(TPCH.class);
   private final String BENCHMARK_DIR = "benchmark/tpch";
 
-  public static String LINEITEM = "lineitem";
-  public static String CUSTOMER = "customer";
-  public static String NATION = "nation";
-  public static String PART = "part";
-  public static String REGION = "region";
-  public static String ORDERS = "orders";
-  public static String PARTSUPP = "partsupp";
-  public static String SUPPLIER = "supplier";
+  public static final String LINEITEM = "lineitem";
+  public static final String CUSTOMER = "customer";
+  public static final String NATION = "nation";
+  public static final String PART = "part";
+  public static final String REGION = "region";
+  public static final String ORDERS = "orders";
+  public static final String PARTSUPP = "partsupp";
+  public static final String SUPPLIER = "supplier";
 
   public static final Map<String, Long> tableVolumes = Maps.newHashMap();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index eb69703..208973e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -64,5 +64,6 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
   @Override
   public void close() throws IOException {
     super.close();
+    plan = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index b39a9f1..71581e3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -34,12 +34,12 @@ import java.util.List;
 
 public class BNLJoinExec extends BinaryPhysicalExec {
   // from logical plan
-  private final JoinNode plan;
+  private JoinNode plan;
   private final boolean hasJoinQual;
-  private final EvalNode joinQual;
+  private EvalNode joinQual;
 
-  private final List<Tuple> leftTupleSlots;
-  private final List<Tuple> rightTupleSlots;
+  private List<Tuple> leftTupleSlots;
+  private List<Tuple> rightTupleSlots;
   private Iterator<Tuple> leftIterator;
   private Iterator<Tuple> rightIterator;
 
@@ -55,7 +55,7 @@ public class BNLJoinExec extends BinaryPhysicalExec {
   private final int TUPLE_SLOT_SIZE = 10000;
 
   // projection
-  private final Projector projector;
+  private Projector projector;
 
   public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
                      final PhysicalExec leftExec, PhysicalExec rightExec) {
@@ -205,4 +205,19 @@ public class BNLJoinExec extends BinaryPhysicalExec {
     rightIterator = rightTupleSlots.iterator();
     leftIterator = leftTupleSlots.iterator();
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+
+    rightTupleSlots.clear();
+    leftTupleSlots.clear();
+    rightTupleSlots = null;
+    leftTupleSlots = null;
+    rightIterator = null;
+    leftIterator = null;
+    plan = null;
+    joinQual = null;
+    projector = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 2ff6fc9..753dcc8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.eval.EvalNode;
@@ -38,7 +39,7 @@ public class BSTIndexScanExec extends PhysicalExec {
   private EvalNode qual;
   private BSTIndex.BSTIndexReader reader;
   
-  private final Projector projector;
+  private Projector projector;
   
   private Datum[] datum = null;
   
@@ -125,8 +126,11 @@ public class BSTIndexScanExec extends PhysicalExec {
 
   @Override
   public void close() throws IOException {
-    reader.close();
-    fileScanner.close();
+    IOUtils.cleanup(null, reader, fileScanner);
+    reader = null;
+    fileScanner = null;
+    scanNode = null;
+    qual = null;
+    projector = null;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index f1e3a00..fc8d25d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -18,14 +18,14 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 public abstract class BinaryPhysicalExec extends PhysicalExec {
-  protected final PhysicalExec leftChild;
-  protected final PhysicalExec rightChild;
+  protected PhysicalExec leftChild;
+  protected PhysicalExec rightChild;
 
   public BinaryPhysicalExec(final TaskAttemptContext context,
                             final Schema inSchema, final Schema outSchema,
@@ -59,5 +59,7 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
   public void close() throws IOException {
     leftChild.close();
     rightChild.close();
+    leftChild = null;
+    rightChild = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index f0ac290..791781e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -61,7 +62,7 @@ public class ExternalSortExec extends SortExec {
   /** Class logger */
   private static final Log LOG = LogFactory.getLog(ExternalSortExec.class);
 
-  private final SortNode plan;
+  private SortNode plan;
   private final TableMeta meta;
   /** the defaultFanout of external sort */
   private final int defaultFanout;
@@ -70,9 +71,9 @@ public class ExternalSortExec extends SortExec {
   /** the number of available cores */
   private final int allocatedCoreNum;
   /** If there are available multiple cores, it tries parallel merge. */
-  private final ExecutorService executorService;
+  private ExecutorService executorService;
   /** used for in-memory sort of each chunk. */
-  private final List<Tuple> inMemoryTable;
+  private List<Tuple> inMemoryTable;
   /** temporal dir */
   private final Path sortTmpDir;
   /** It enables round-robin disks allocation */
@@ -512,8 +513,8 @@ public class ExternalSortExec extends SortExec {
    * Two-way merger scanner that reads two input sources and outputs one output tuples sorted in some order.
    */
   private class PairWiseMerger implements Scanner {
-    private final Scanner leftScan;
-    private final Scanner rightScan;
+    private Scanner leftScan;
+    private Scanner rightScan;
 
     private Tuple leftTuple;
     private Tuple rightTuple;
@@ -564,9 +565,11 @@ public class ExternalSortExec extends SortExec {
       init();
     }
 
+    @Override
     public void close() throws IOException {
-      leftScan.close();
-      rightScan.close();
+      IOUtils.cleanup(LOG, leftScan, rightScan);
+      leftScan = null;
+      rightScan = null;
     }
 
     @Override
@@ -602,6 +605,7 @@ public class ExternalSortExec extends SortExec {
   public void close() throws IOException {
     if (result != null) {
       result.close();
+      result = null;
     }
 
     if (finalOutputFiles != null) {
@@ -610,7 +614,18 @@ public class ExternalSortExec extends SortExec {
       }
     }
 
-    inMemoryTable.clear();
+    if(inMemoryTable != null){
+      inMemoryTable.clear();
+      inMemoryTable = null;
+    }
+
+    if(executorService != null){
+      executorService.shutdown();
+      executorService = null;
+    }
+
+    plan = null;
+    super.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index a4153dc..1f8d000 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -109,5 +109,7 @@ public class HashAggregateExec extends AggregationExec {
   public void close() throws IOException {
     super.close();
     hashTable.clear();
+    hashTable = null;
+    iterator = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 848e362..70dd10b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -233,8 +233,16 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
     shouldGetLeftTuple = true;
   }
 
+  @Override
   public void close() throws IOException {
+    super.close();
     tupleSlots.clear();
+    matched.clear();
+    tupleSlots = null;
+    matched = null;
+    iterator = null;
+    plan = null;
+    joinQual = null;
   }
 
   public JoinNode getPlan() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 08b7035..51d0b4c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -179,8 +179,17 @@ public class HashJoinExec extends BinaryPhysicalExec {
     shouldGetLeftTuple = true;
   }
 
+  @Override
   public void close() throws IOException {
-    tupleSlots.clear();
+    super.close();
+    if (tupleSlots != null) {
+      tupleSlots.clear();
+      tupleSlots = null;
+    }
+
+    iterator = null;
+    plan = null;
+    joinQual = null;
   }
 
   public JoinNode getPlan() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 314b3d9..93383a3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -59,7 +59,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   protected boolean shouldGetLeftTuple = true;
 
   // projection
-  protected final Projector projector;
+  protected Projector projector;
 
   private int rightNumCols;
   private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class);
@@ -192,8 +192,16 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
     shouldGetLeftTuple = true;
   }
 
+
+  @Override
   public void close() throws IOException {
+    super.close();
     tupleSlots.clear();
+    tupleSlots = null;
+    iterator = null;
+    plan = null;
+    joinQual = null;
+    projector = null;
   }
 
   public JoinNode getPlan() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index c09ec19..e2b926d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -46,11 +46,11 @@ import java.util.Map;
  */
 public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
   private static Log LOG = LogFactory.getLog(HashShuffleFileWriteExec.class);
-  private final ShuffleFileWriteNode plan;
+  private ShuffleFileWriteNode plan;
   private final TableMeta meta;
-  private final Partitioner partitioner;
+  private Partitioner partitioner;
   private final Path storeTablePath;
-  private final Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
+  private Map<Integer, Appender> appenderMap = new HashMap<Integer, Appender>();
   private final int numShuffleOutputs;
   private final int [] shuffleKeyIds;
   
@@ -143,4 +143,16 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
   public void rescan() throws IOException {
     // nothing to do   
   }
+
+  @Override
+  public void close() throws IOException{
+    super.close();
+    if (appenderMap != null) {
+      appenderMap.clear();
+      appenderMap = null;
+    }
+
+    partitioner = null;
+    plan = null;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index 46d061a..9f4f20a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -30,7 +30,7 @@ import java.util.Iterator;
 import java.util.List;
 
 public class MemSortExec extends SortExec {
-  private final SortNode plan;
+  private SortNode plan;
   private List<Tuple> tupleSlots;
   private boolean sorted = false;
   private Iterator<Tuple> iterator;
@@ -74,6 +74,15 @@ public class MemSortExec extends SortExec {
     sorted = true;
   }
 
+  @Override
+  public void close() throws IOException {
+    super.close();
+    tupleSlots.clear();
+    tupleSlots = null;
+    iterator = null;
+    plan = null;
+  }
+
   public SortNode getPlan() {
     return this.plan;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index 1d6da3f..613e072 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -48,8 +48,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
   private Tuple outTuple = null;
   private Tuple leftNext = null;
 
-  private final List<Tuple> leftTupleSlots;
-  private final List<Tuple> rightTupleSlots;
+  private List<Tuple> leftTupleSlots;
+  private List<Tuple> rightTupleSlots;
 
   private JoinTupleComparator joincomparator = null;
   private TupleComparator[] tupleComparator = null;
@@ -59,7 +59,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
   private boolean end = false;
 
   // projection
-  private final Projector projector;
+  private Projector projector;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -320,4 +320,16 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
     posRightTupleSlots = -1;
     posLeftTupleSlots = -1;
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    leftTupleSlots.clear();
+    rightTupleSlots.clear();
+    leftTupleSlots = null;
+    rightTupleSlots = null;
+    joinNode = null;
+    joinQual = null;
+    projector = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index e1d377e..f72e87e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -47,8 +47,8 @@ public class MergeJoinExec extends BinaryPhysicalExec {
   private Tuple outTuple = null;
   private Tuple outerNext = null;
 
-  private final List<Tuple> outerTupleSlots;
-  private final List<Tuple> innerTupleSlots;
+  private List<Tuple> outerTupleSlots;
+  private List<Tuple> innerTupleSlots;
   private Iterator<Tuple> outerIterator;
   private Iterator<Tuple> innerIterator;
 
@@ -60,7 +60,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
   private boolean end = false;
 
   // projection
-  private final Projector projector;
+  private Projector projector;
 
   public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
       PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
@@ -176,4 +176,18 @@ public class MergeJoinExec extends BinaryPhysicalExec {
     outerIterator = outerTupleSlots.iterator();
     innerIterator = innerTupleSlots.iterator();
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+
+    outerTupleSlots.clear();
+    innerTupleSlots.clear();
+    outerTupleSlots = null;
+    innerTupleSlots = null;
+    outerIterator = null;
+    innerIterator = null;
+    joinQual = null;
+    projector = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index ecc6dd0..e205751 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -30,7 +30,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
 import java.io.IOException;
 
 public class ProjectionExec extends UnaryPhysicalExec {
-  private final Projectable plan;
+  private Projectable plan;
 
   // for projection
   private Tuple outTuple;
@@ -60,4 +60,10 @@ public class ProjectionExec extends UnaryPhysicalExec {
     projector.eval(tuple, outTuple);
     return outTuple;
   }
+
+  @Override
+  public void close() throws IOException{
+    super.close();
+    plan = null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 13573eb..698e46e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
@@ -117,12 +118,14 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
     super.close();
 
     appender.flush();
-    appender.close();
+    IOUtils.cleanup(LOG, appender);
     indexWriter.flush();
-    indexWriter.close();
+    IOUtils.cleanup(LOG, indexWriter);
 
     // Collect statistics data
     context.setResultStats(appender.getStats());
     context.addShuffleFileOutput(0, context.getTaskId().toString());
+    appender = null;
+    indexWriter = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 365faba..b494544 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -47,8 +47,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
   private Tuple outTuple = null;
   private Tuple nextLeft = null;
 
-  private final List<Tuple> leftTupleSlots;
-  private final List<Tuple> innerTupleSlots;
+  private List<Tuple> leftTupleSlots;
+  private List<Tuple> innerTupleSlots;
 
   private JoinTupleComparator joinComparator = null;
   private TupleComparator[] tupleComparator = null;
@@ -58,7 +58,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
   private boolean end = false;
 
   // projection
-  private final Projector projector;
+  private Projector projector;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -330,5 +330,17 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
     posRightTupleSlots = -1;
     posLeftTupleSlots = -1;
   }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    leftTupleSlots.clear();
+    innerTupleSlots.clear();
+    leftTupleSlots = null;
+    innerTupleSlots = null;
+    joinNode = null;
+    joinQual = null;
+    projector = null;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 31a944c..a0c0eeb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -43,7 +44,7 @@ import java.util.Set;
 
 
 public class SeqScanExec extends PhysicalExec {
-  private final ScanNode plan;
+  private ScanNode plan;
   private Scanner scanner = null;
 
   private EvalNode qual = null;
@@ -186,7 +187,11 @@ public class SeqScanExec extends PhysicalExec {
 
   @Override
   public void close() throws IOException {
-    scanner.close();
+    IOUtils.cleanup(null, scanner);
+    scanner = null;
+    plan = null;
+    qual = null;
+    projector = null;
   }
 
   public String getTableName() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index f097d0c..786148b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -36,7 +36,7 @@ import java.io.IOException;
  * This is a physical executor to store a table part into a specified storage.
  */
 public class StoreTableExec extends UnaryPhysicalExec {
-  private final PersistentStoreNode plan;
+  private PersistentStoreNode plan;
   private Appender appender;
   private Tuple tuple;
 
@@ -88,11 +88,15 @@ public class StoreTableExec extends UnaryPhysicalExec {
   public void close() throws IOException {
     super.close();
 
-    appender.flush();
-    appender.close();
+    if(appender != null){
+      appender.flush();
+      appender.close();
+      // Collect statistics data
+      context.setResultStats(appender.getStats());
+      context.addShuffleFileOutput(0, context.getTaskId().toString());
+    }
 
-    // Collect statistics data
-    context.setResultStats(appender.getStats());
-    context.addShuffleFileOutput(0, context.getTaskId().toString());
+    appender = null;
+    plan = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index f31455f..ceeca06 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -58,6 +58,7 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
   public void close() throws IOException {
     if (child != null) {
       child.close();
+      child = null;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index b6bde94..4260c98 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -48,6 +48,7 @@ import java.net.URI;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 
@@ -58,7 +59,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   private SubQuery subQuery;
 
   private Thread schedulingThread;
-  private volatile boolean stopEventHandling;
+  private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
 
   private ScheduledRequests scheduledRequests;
   private TaskRequests taskRequests;
@@ -91,7 +92,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     this.schedulingThread = new Thread() {
       public void run() {
 
-        while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+        while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
           schedule();
           try {
             synchronized (schedulingThread){
@@ -127,7 +128,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
   @Override
   public void stop() {
-    stopEventHandling = true;
+    if(stopEventHandling.getAndSet(true)){
+      return;
+    }
 
     if (schedulingThread != null) {
       synchronized (schedulingThread) {
@@ -224,6 +227,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
   @Override
   public void handleTaskRequestEvent(TaskRequestEvent event) {
+
     taskRequests.handle(event);
     int hosts = scheduledRequests.leafTaskHostMapping.size();
 
@@ -248,7 +252,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     @Override
     public void handle(TaskRequestEvent event) {
       LOG.info("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
-      if(stopEventHandling) {
+      if(stopEventHandling.get()) {
         event.getCallback().run(stopTaskRunnerReq);
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 08d080d..43e6493 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -340,10 +340,12 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
           "containerId=" + taskRequest.getContainerId());
       ContainerProxy container = context.getMasterContext().getResourceAllocator().
           getContainer(taskRequest.getContainerId());
-      String host = container.getTaskHostName();
+
       if(container == null) {
         continue;
       }
+
+      String host = container.getTaskHostName();
       QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
           host, taskRequest.getCallback());
       QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 39a73ba..f405ea7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -82,9 +82,6 @@ public class TajoContainerProxy extends ContainerProxy {
 
       tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
     } catch (Exception e) {
-      //TODO retry
-      RpcConnectionPool.getPool(context.getConf()).closeConnection(tajoWorkerRpc);
-      tajoWorkerRpc = null;
       LOG.error(e.getMessage(), e);
     } finally {
       RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
@@ -150,8 +147,6 @@ public class TajoContainerProxy extends ContainerProxy {
               .build(),
           NullCallback.get());
     } catch (Exception e) {
-      connPool.closeConnection(tmClient);
-      tmClient = null;
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(tmClient);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 2c194fd..7dcc55f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -50,6 +51,7 @@ import org.apache.tajo.master.metrics.WorkerResourceMetricsGaugeSet;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.util.ClassUtil;
@@ -383,10 +385,14 @@ public class TajoMaster extends CompositeService {
       }
     }
 
+    IOUtils.cleanup(LOG, catalogServer);
+
     if(systemMetrics != null) {
       systemMetrics.stop();
     }
 
+    RpcChannelFactory.shutdown();
+
     super.stop();
     LOG.info("Tajo Master main thread exiting");
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 6e2ee8b..0f8eb61 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -82,12 +82,15 @@ public class TajoMasterClientService extends AbstractService {
     // start the rpc server
     String confClientServiceAddr = conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(confClientServiceAddr);
+    int workerNum = conf.getIntVar(ConfVars.MASTER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
     try {
-      server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa);
+      server = new BlockingRpcServer(TajoMasterClientProtocol.class, clientHandler, initIsa, workerNum);
     } catch (Exception e) {
       LOG.error(e);
+      throw new RuntimeException(e);
     }
     server.start();
+
     bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
     this.conf.setVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
     LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
index 7853f63..c3a0829 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -63,8 +63,9 @@ public class TajoMasterService extends AbstractService {
   public void start() {
     String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS);
     InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM);
     try {
-      server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
+      server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum);
     } catch (Exception e) {
       LOG.error(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
index dbec652..4f178fb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -297,17 +297,11 @@ public class YarnContainerProxy extends ContainerProxy {
     // Set the local resources
     ////////////////////////////////////////////////////////////////////////////
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    FileSystem fs = null;
-    FileContext fsCtx = null;
     LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
-    try {
-      fs = FileSystem.get(conf);
-      fsCtx = FileContext.getFileContext(conf);
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
 
     try {
+      FileSystem fs = FileSystem.get(conf);
+      FileContext fsCtx = FileContext.getFileContext(conf);
       Path systemConfPath = TajoConf.getSystemConfPath(conf);
       if (!fs.exists(systemConfPath)) {
         LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
index 7c38e7a..28b5f08 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -129,7 +129,7 @@ public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskR
     }
   }
 
-  private class StopContainerRunner implements Runnable {
+  private static class StopContainerRunner implements Runnable {
     private final ContainerProxy proxy;
     private final ContainerId id;
     public StopContainerRunner(ContainerId id, ContainerProxy proxy) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2ab6c6a..2a93d3c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -93,12 +93,10 @@ public class QueryInProgress extends CompositeService {
 
   @Override
   public void stop() {
-    synchronized(stopped) {
-      if(stopped.get()) {
-        return;
-      }
-      stopped.set(true);
+    if(stopped.getAndSet(true)) {
+      return;
     }
+
     LOG.info("=========================================================");
     LOG.info("Stop query:" + queryId);
 
@@ -113,10 +111,13 @@ public class QueryInProgress extends CompositeService {
         }
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
+        break;
       }
 
       try {
-        Thread.sleep(1000);
+        synchronized (this){
+          wait(100);
+        }
       } catch (InterruptedException e) {
         break;
       }
@@ -126,11 +127,10 @@ public class QueryInProgress extends CompositeService {
       }
     }
 
-    super.stop();
-
     if(queryMasterRpc != null) {
       RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
     }
+    super.stop();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index ecf6cb2..ae6d5eb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -140,13 +140,8 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   @Override
   public void stop() {
-    synchronized(queryMasterStop) {
-      if(queryMasterStop.get()) {
-         return;
-      }
-
-      queryMasterStop.set(true);
-      queryMasterStop.notifyAll();
+    if(queryMasterStop.getAndSet(true)){
+      return;
     }
 
     if(queryHeartbeatThread != null) {
@@ -183,8 +178,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
         tajoWorkerProtocolService.cleanup(null, queryId.getProto(), NullCallback.get());
       } catch (Exception e) {
-        connPool.closeConnection(rpc);
-        rpc = null;
         LOG.error(e.getMessage());
       } finally {
         connPool.releaseConnection(rpc);
@@ -208,8 +201,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
       TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS);
       return workerResourcesRequest.getWorkerResourcesList();
     } catch (Exception e) {
-      connPool.closeConnection(rpc);
-      rpc = null;
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(rpc);
@@ -238,8 +229,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack);
     } catch (Exception e) {
-      connPool.closeConnection(tmClient);
-      tmClient = null;
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(tmClient);
@@ -338,9 +327,10 @@ public class QueryMaster extends CompositeService implements EventHandler {
               TajoMasterProtocol.class, true);
           TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
           masterClientService.heartbeat(future.getController(), queryHeartbeat, future);
-        } catch (Exception e) {
-          connPool.closeConnection(tmClient);
-          tmClient = null;
+        }  catch (Exception e) {
+          //this function will be closed in new thread.
+          //When tajo do stop cluster, tajo master maybe throw closed connection exception
+
           LOG.error(e.getMessage(), e);
         } finally {
           connPool.releaseConnection(tmClient);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 3618d3b..5ce57f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master.querymaster;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
@@ -67,6 +68,8 @@ public class QueryMasterManagerService extends CompositeService
 
   @Override
   public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf tajoConf = (TajoConf) conf;
     try {
       // Setup RPC server
       InetSocketAddress initIsa =
@@ -75,7 +78,8 @@ public class QueryMasterManagerService extends CompositeService
         throw new IllegalArgumentException("Failed resolve of " + initIsa);
       }
 
-      this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa);
+      int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.QUERY_MASTER_RPC_SERVER_WORKER_THREAD_NUM);
+      this.rpcServer = new AsyncRpcServer(QueryMasterProtocol.class, this, initIsa, workerNum);
       this.rpcServer.start();
 
       this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
@@ -127,7 +131,7 @@ public class QueryMasterManagerService extends CompositeService
       ContainerId cid =
           queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
 
-      if(queryMasterTask == null || queryMasterTask.isStopped()) {
+      if(queryMasterTask.isStopped()) {
         LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
         done.run(LazyTaskScheduler.stopTaskRunnerReq);
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index e193509..eb0528c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -160,10 +160,10 @@ public class QueryMasterTask extends CompositeService {
 
   @Override
   public void stop() {
-    if(stopped.get()) {
+
+    if(stopped.getAndSet(true)) {
       return;
     }
-    stopped.set(true);
 
     LOG.info("Stopping QueryMasterTask:" + queryId);
 
@@ -177,8 +177,6 @@ public class QueryMasterTask extends CompositeService {
       TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
       masterClientService.stopQueryMaster(null, queryId.getProto(), future);
     } catch (Exception e) {
-      connPool.closeConnection(tmClient);
-      tmClient = null;
       LOG.error(e.getMessage(), e);
     } finally {
       connPool.releaseConnection(tmClient);
@@ -249,7 +247,7 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
-  private class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+  private static class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
     @Override
     public void handle(QueryFinishEvent event) {
       QueryId queryId = event.getQueryId();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index b39901e..2c3572c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -323,7 +323,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
   }
 
-  class WorkerResourceRequest {
+  static class WorkerResourceRequest {
     boolean queryMasterRequest;
     QueryId queryId;
     TajoMasterProtocol.WorkerResourceAllocationRequest request;
@@ -339,7 +339,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
     }
   }
 
-  class AllocatedWorkerResource {
+  static class AllocatedWorkerResource {
     WorkerResource workerResource;
     int allocatedMemoryMB;
     float allocatedDiskSlots;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index 72ada9b..80aab9b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -178,7 +178,7 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
     try {
       FinalApplicationStatus appStatus = FinalApplicationStatus.UNDEFINED;
       QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
-      if(queryInProgress != null) {
+      if(queryInProgress == null) {
         return;
       }
       TajoProtos.QueryState state = queryInProgress.getQueryInfo().getQueryState();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
index 35dd6f1..9e895b8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/MetricsFileScheduledReporter.java
@@ -52,6 +52,7 @@ public class MetricsFileScheduledReporter extends MetricsStreamScheduledReporter
       this.setDateFormat(null);
     } catch (FileNotFoundException e) {
       LOG.warn("Can't open metrics file:" + fileName);
+      this.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
index c9d25c5..f11d520 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/metrics/reporter/TajoMetricsScheduledReporter.java
@@ -118,7 +118,7 @@ public abstract class TajoMetricsScheduledReporter extends TajoMetricsReporter i
     this.filter = filterList;
 
     this.period = 60;
-    if(metricsProperties != null && metricsProperties.get(metricsPropertyKey + PERIOD_KEY) != null) {
+    if(metricsProperties.get(metricsPropertyKey + PERIOD_KEY) != null) {
       this.period = Integer.parseInt(metricsProperties.get(metricsPropertyKey + PERIOD_KEY));
     }
     afterInit();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
index 4f9f3fa..bb136f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -18,14 +18,13 @@
 
 package org.apache.tajo.worker;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.*;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.*;
 
 import java.io.File;
@@ -35,8 +34,6 @@ import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.FileChannel;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 
 import static org.jboss.netty.channel.Channels.pipeline;
 
@@ -58,20 +55,9 @@ public class Fetcher {
   private long fileLen;
   private int messageReceiveCount;
 
-
-  private static final ThreadFactory bossFactory = new ThreadFactoryBuilder()
-      .setNameFormat("Fetcher Netty Boss #%d")
-      .build();
-  private static final ThreadFactory workerFactory = new ThreadFactoryBuilder()
-      .setNameFormat("Fetcher Netty Worker #%d")
-      .build();
-  private static final ChannelFactory factory = new NioClientSocketChannelFactory(
-      Executors.newCachedThreadPool(bossFactory),
-      Executors.newCachedThreadPool(workerFactory));
-
   private ClientBootstrap bootstrap;
 
-  public Fetcher(URI uri, File file) {
+  public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
     this.uri = uri;
     this.file = file;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 6d01f55..16427b6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -100,8 +100,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
                                            int memoryMBPerTask) {
     //TODO consider disk slot
     TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource();
-    int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB()/memoryMBPerTask;
-    return clusterSlots == 0 ? 1: Math.min(numTasks, clusterSlots);
+    int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
+    clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
+    return  Math.min(numTasks, clusterSlots);
   }
 
   @Override
@@ -117,10 +118,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
   @Override
   public synchronized void stop() {
-    if(stopped.get()) {
+    if (stopped.getAndSet(true)) {
       return;
     }
-    stopped.set(true);
+
     executorService.shutdownNow();
 
     Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
@@ -177,7 +178,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     }
   }
 
-  protected class LaunchRunner implements Runnable {
+  protected static class LaunchRunner implements Runnable {
     private final ContainerProxy proxy;
     private final ContainerId id;
     public LaunchRunner(ContainerId id, ContainerProxy proxy) {
@@ -198,7 +199,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     }
   }
 
-  private class StopContainerRunner implements Runnable {
+  private static class StopContainerRunner implements Runnable {
     private final ContainerProxy proxy;
     private final ContainerId id;
     public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
@@ -256,8 +257,6 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
         masterClientService.allocateWorkerResources(null, request, callBack);
       } catch (Exception e) {
-        connPool.closeConnection(tmClient);
-        tmClient = null;
         LOG.error(e.getMessage(), e);
       } finally {
         connPool.releaseConnection(tmClient);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 456bd95..2f763e3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -43,6 +43,7 @@ import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.storage.v2.DiskDeviceInfo;
@@ -68,8 +69,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.tajo.conf.TajoConf.ConfVars;
 
 public class TajoWorker extends CompositeService {
-  public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
-  public static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+  public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
 
   public static final String WORKER_MODE_YARN_TASKRUNNER = "tr";
   public static final String WORKER_MODE_YARN_QUERYMASTER = "qm";
@@ -304,10 +305,10 @@ public class TajoWorker extends CompositeService {
 
   @Override
   public void stop() {
-    if(stopped.get()) {
+    if(stopped.getAndSet(true)) {
       return;
     }
-    stopped.set(true);
+
     if(webServer != null) {
       try {
         webServer.stop();
@@ -316,7 +317,9 @@ public class TajoWorker extends CompositeService {
       }
     }
     if(workerHeartbeatThread != null) {
-      workerHeartbeatThread.interrupt();
+      synchronized (workerHeartbeatThread){
+        workerHeartbeatThread.notifyAll();
+      }
     }
 
     if (catalogClient != null) {
@@ -324,7 +327,8 @@ public class TajoWorker extends CompositeService {
     }
 
     if(connPool != null) {
-      connPool.close();
+      connPool.shutdown();
+      RpcChannelFactory.shutdown();
     }
 
     if(webServer != null && webServer.isAlive()) {
@@ -570,7 +574,7 @@ public class TajoWorker extends CompositeService {
         clientPort = workerContext.getTajoWorkerClientService().getBindAddr().getPort();
       }
 
-      while(true) {
+      while(!stopped.get()) {
         if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
           getDiskUsageInfos();
         }
@@ -626,15 +630,15 @@ public class TajoWorker extends CompositeService {
         } catch (InterruptedException e) {
           break;
         } catch (Exception e) {
-          connPool.closeConnection(tmClient);
-          tmClient = null;
           LOG.error(e.getMessage(), e);
         } finally {
           connPool.releaseConnection(tmClient);
         }
 
         try {
-          Thread.sleep(10 * 1000);
+          synchronized (workerHeartbeatThread){
+            wait(10 * 1000);
+          }
         } catch (InterruptedException e) {
           break;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index f6e7742..da3dc34 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
@@ -29,6 +30,7 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.master.querymaster.Query;
@@ -51,7 +53,7 @@ public class TajoWorkerClientService extends AbstractService {
   private InetSocketAddress bindAddr;
   private String addr;
   private int port;
-  private Configuration conf;
+  private TajoConf conf;
   private TajoWorker.WorkerContext workerContext;
   private TajoWorkerClientProtocolServiceHandler serviceHandler;
 
@@ -64,7 +66,8 @@ public class TajoWorkerClientService extends AbstractService {
 
   @Override
   public void init(Configuration conf) {
-    this.conf = conf;
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    this.conf = (TajoConf) conf;
     this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
 
     // init RPC Server in constructor cause Heartbeat Thread use bindAddr
@@ -77,7 +80,8 @@ public class TajoWorkerClientService extends AbstractService {
       }
 
       // TODO blocking/non-blocking??
-      this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
+      int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
+      this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum);
       this.rpcServer.start();
 
       this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e2a7dffd/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 60b0903..c770696 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
@@ -53,6 +54,7 @@ public class TajoWorkerManagerService extends CompositeService
 
   @Override
   public void init(Configuration conf) {
+    Preconditions.checkArgument(conf instanceof TajoConf);
     TajoConf tajoConf = (TajoConf) conf;
     try {
       // Setup RPC server
@@ -62,7 +64,8 @@ public class TajoWorkerManagerService extends CompositeService
         throw new IllegalArgumentException("Failed resolve of " + initIsa);
       }
 
-      this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa);
+      int workerNum = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM);
+      this.rpcServer = new AsyncRpcServer(TajoWorkerProtocol.class, this, initIsa, workerNum);
       this.rpcServer.start();
 
       this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());