You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/11 07:46:12 UTC

tajo git commit: TAJO-1831: Add a shutdown hook manager in order to set priorities.

Repository: tajo
Updated Branches:
  refs/heads/master cea832aca -> 60d8d4bc8


TAJO-1831: Add a shutdown hook manager in order to set priorities.

Closes #751


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/60d8d4bc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/60d8d4bc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/60d8d4bc

Branch: refs/heads/master
Commit: 60d8d4bc80444402c2b4ad61644859d7bda8b380
Parents: cea832a
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Sep 11 14:45:11 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Sep 11 14:45:11 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/cli/tsql/TajoCli.java  |   6 +-
 .../apache/tajo/client/SessionConnection.java   |  42 ++---
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 +-
 .../apache/tajo/util/ShutdownHookManager.java   | 188 +++++++++++++++++++
 tajo-core-tests/pom.xml                         |  15 ++
 .../org/apache/tajo/client/TestTajoClient.java  |  26 +++
 .../java/org/apache/tajo/master/TajoMaster.java |   8 +-
 .../java/org/apache/tajo/worker/Fetcher.java    |   4 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   6 +-
 .../tajo/pullserver/TajoPullServerService.java  |   4 +-
 .../org/apache/tajo/rpc/NettyServerBase.java    |   2 +-
 .../java/org/apache/tajo/rpc/NettyUtils.java    | 139 ++++++++++++++
 .../org/apache/tajo/rpc/RpcChannelFactory.java  | 180 ------------------
 .../org/apache/tajo/rpc/AsyncRpcClient.java     |  11 +-
 .../org/apache/tajo/rpc/AsyncRpcServer.java     |   4 +-
 .../org/apache/tajo/rpc/BlockingRpcClient.java  |  12 +-
 .../org/apache/tajo/rpc/BlockingRpcServer.java  |   4 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |  15 +-
 .../org/apache/tajo/rpc/RpcClientManager.java   |  30 ++-
 .../java/org/apache/tajo/rpc/TestAsyncRpc.java  |   2 +-
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |   2 +-
 22 files changed, 463 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 04adaa5..2823bf8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -36,6 +36,8 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1831: Add a shutdown hook manager in order to set priorities. (jinho)
+
     TAJO-1817: Improve SQL parser error message. (hyunsik)
 
     TAJO-1825: Remove zero length fragments when file length is zero. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
index f17ec80..83763e8 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java
@@ -39,6 +39,7 @@ import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.ShutdownHookManager;
 
 import java.io.*;
 import java.lang.reflect.Constructor;
@@ -50,6 +51,7 @@ import java.util.Map;
 import java.util.TreeMap;
 
 public class TajoCli {
+  public static final int SHUTDOWN_HOOK_PRIORITY = 50;
   public static final String ERROR_PREFIX = "ERROR: ";
   public static final String KILL_PREFIX = "KILL: ";
 
@@ -373,7 +375,7 @@ public class TajoCli {
   }
 
   private void addShutdownHook() {
-    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+    ShutdownHookManager.get().addShutdownHook(new Runnable() {
       @Override
       public void run() {
         try {
@@ -382,7 +384,7 @@ public class TajoCli {
         }
         client.close();
       }
-    }));
+    }, SHUTDOWN_HOOK_PRIORITY);
   }
 
   private String updatePrompt(ParsingState state) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index b63d35b..ac0ff52 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.client;
 
 import com.google.protobuf.ServiceException;
-import io.netty.channel.ConnectTimeoutException;
+import io.netty.channel.EventLoopGroup;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.SessionVars;
@@ -38,16 +38,14 @@ import org.apache.tajo.ipc.ClientProtos.UpdateSessionVariableRequest;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService.BlockingInterface;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.NettyUtils;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse;
 import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.io.Closeable;
@@ -57,9 +55,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE;
 import static org.apache.tajo.exception.ReturnStateUtil.*;
@@ -70,8 +66,6 @@ public class SessionConnection implements Closeable {
 
   private final static Log LOG = LogFactory.getLog(SessionConnection.class);
 
-  private final static AtomicInteger connections = new AtomicInteger();
-
   final RpcClientManager manager;
 
   private String baseDatabase;
@@ -87,6 +81,8 @@ public class SessionConnection implements Closeable {
 
   private final ServiceTracker serviceTracker;
 
+  private final EventLoopGroup eventLoopGroup;
+
   private NettyClientBase client;
 
   private final KeyValueSet properties;
@@ -110,7 +106,13 @@ public class SessionConnection implements Closeable {
     this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
     this.userInfo = UserRoleInfo.getCurrentUser();
 
-    this.client = getTajoMasterConnection();
+    this.eventLoopGroup = NettyUtils.createEventLoopGroup(getClass().getSimpleName(), 4);
+    try {
+      this.client = getTajoMasterConnection();
+    } catch (TajoRuntimeException e) {
+      NettyUtils.shutdown(eventLoopGroup);
+      throw e;
+    }
   }
 
   public Map<String, String> getClientSideSessionVars() {
@@ -127,16 +129,8 @@ public class SessionConnection implements Closeable {
         RpcClientManager.cleanup(client);
 
         // Client do not closed on idle state for support high available
-        this.client = manager.newClient(
-            getTajoMasterAddr(),
-            TajoMasterClientProtocol.class,
-            false,
-            manager.getRetries(),
-            0,
-            TimeUnit.SECONDS,
-            false);
-        connections.incrementAndGet();
-
+        this.client = manager.newBlockingClient(getTajoMasterAddr(), TajoMasterClientProtocol.class,
+            manager.getRetries(), eventLoopGroup);
       } catch (Throwable t) {
         throw new TajoRuntimeException(new ClientConnectionException(t));
       }
@@ -346,14 +340,7 @@ public class SessionConnection implements Closeable {
       // ignore
     } finally {
       RpcClientManager.cleanup(client);
-      if(connections.decrementAndGet() == 0) {
-        if (!System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equals(CommonTestingUtil.TAJO_TEST_TRUE)) {
-          RpcChannelFactory.shutdownGracefully();
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("RPC connection is closed");
-          }
-        }
-      }
+      NettyUtils.shutdown(eventLoopGroup);
     }
   }
 
@@ -457,5 +444,4 @@ public class SessionConnection implements Closeable {
     }
     return builder.build();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 909f266..0f393f6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -208,7 +208,7 @@ public class TajoConf extends Configuration {
     SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
         2, Validators.min("1")),
     SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  8192),
-    SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 10, Validators.min("1")),
+    SHUFFLE_FETCHER_CONNECT_TIMEOUT("tajo.shuffle.fetcher.connect.timeout-sec", 60, Validators.min("1")),
     SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 60, Validators.min("1")),
     SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 2, Validators.min("0")),
     SHUFFLE_HASH_APPENDER_BUFFER_SIZE("tajo.shuffle.hash.appender.buffer.size", 10000),

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java b/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java
new file mode 100644
index 0000000..3ec535f
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ShutdownHookManager.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The <code>ShutdownHookManager</code> enables running shutdownHook
+ * in a deterministic order, higher priority first.
+ * <p/>
+ * The JVM runs ShutdownHooks in a non-deterministic order or in parallel.
+ * This class registers a single JVM shutdownHook and run all the
+ * shutdownHooks registered to it (to this class) in order based on their
+ * priority.
+ *
+ * this is an implementation copied from hadoop-common
+ */
+public class ShutdownHookManager {
+
+  private static final ShutdownHookManager MGR = new ShutdownHookManager();
+
+  private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
+
+  static {
+    Runtime.getRuntime().addShutdownHook(
+        new Thread() {
+          @Override
+          public void run() {
+            MGR.shutdownInProgress.set(true);
+            for (Runnable hook: MGR.getShutdownHooksInOrder()) {
+              try {
+                hook.run();
+              } catch (Throwable ex) {
+                LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
+                    "' failed, " + ex.toString(), ex);
+              }
+            }
+          }
+        }
+    );
+  }
+
+  /**
+   * Return <code>ShutdownHookManager</code> singleton.
+   *
+   * @return <code>ShutdownHookManager</code> singleton.
+   */
+  public static ShutdownHookManager get() {
+    return MGR;
+  }
+
+  /**
+   * Private structure to store ShutdownHook and its priority.
+   */
+  private static class HookEntry {
+    Runnable hook;
+    int priority;
+
+    public HookEntry(Runnable hook, int priority) {
+      this.hook = hook;
+      this.priority = priority;
+    }
+
+    @Override
+    public int hashCode() {
+      return hook.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      boolean eq = false;
+      if (obj != null) {
+        if (obj instanceof HookEntry) {
+          eq = (hook == ((HookEntry)obj).hook);
+        }
+      }
+      return eq;
+    }
+
+  }
+
+  private Set<HookEntry> hooks =
+      Collections.synchronizedSet(new HashSet<HookEntry>());
+
+  private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
+
+  //private to constructor to ensure singularity
+  private ShutdownHookManager() {
+  }
+
+  /**
+   * Returns the list of shutdownHooks in order of execution,
+   * Highest priority first.
+   *
+   * @return the list of shutdownHooks in order of execution.
+   */
+  List<Runnable> getShutdownHooksInOrder() {
+    List<HookEntry> list;
+    synchronized (MGR.hooks) {
+      list = new ArrayList<HookEntry>(MGR.hooks);
+    }
+    Collections.sort(list, new Comparator<HookEntry>() {
+
+      //reversing comparison so highest priority hooks are first
+      @Override
+      public int compare(HookEntry o1, HookEntry o2) {
+        return o2.priority - o1.priority;
+      }
+    });
+    List<Runnable> ordered = new ArrayList<Runnable>();
+    for (HookEntry entry: list) {
+      ordered.add(entry.hook);
+    }
+    return ordered;
+  }
+
+  /**
+   * Adds a shutdownHook with a priority, the higher the priority
+   * the earlier will run. ShutdownHooks with same priority run
+   * in a non-deterministic order.
+   *
+   * @param shutdownHook shutdownHook <code>Runnable</code>
+   * @param priority priority of the shutdownHook.
+   */
+  public void addShutdownHook(Runnable shutdownHook, int priority) {
+    if (shutdownHook == null) {
+      throw new IllegalArgumentException("shutdownHook cannot be NULL");
+    }
+    if (shutdownInProgress.get()) {
+      throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
+    }
+    hooks.add(new HookEntry(shutdownHook, priority));
+  }
+
+  /**
+   * Removes a shutdownHook.
+   *
+   * @param shutdownHook shutdownHook to remove.
+   * @return TRUE if the shutdownHook was registered and removed,
+   * FALSE otherwise.
+   */
+  public boolean removeShutdownHook(Runnable shutdownHook) {
+    if (shutdownInProgress.get()) {
+      throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
+    }
+    return hooks.remove(new HookEntry(shutdownHook, 0));
+  }
+
+  /**
+   * Indicates if a shutdownHook is registered or not.
+   *
+   * @param shutdownHook shutdownHook to check if registered.
+   * @return TRUE/FALSE depending if the shutdownHook is is registered.
+   */
+  public boolean hasShutdownHook(Runnable shutdownHook) {
+    return hooks.contains(new HookEntry(shutdownHook, 0));
+  }
+
+  /**
+   * Indicates if shutdown is in progress or not.
+   *
+   * @return TRUE if the shutdown is in progress, otherwise FALSE.
+   */
+  public boolean isShutdownInProgress() {
+    return shutdownInProgress.get();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index 8199f46..20b7378 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -335,6 +335,21 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-module-junit4</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.powermock</groupId>
+      <artifactId>powermock-api-mockito</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
index efadc7a..38819f1 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -40,6 +40,7 @@ import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
 import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
 import org.apache.tajo.ipc.ClientProtos.StageHistoryProto;
+import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -47,6 +48,7 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.powermock.reflect.Whitebox;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -771,4 +773,28 @@ public class TestTajoClient {
     assertEquals(1, taskHistories.get(1).getTotalReadRows());
     assertEquals(1, taskHistories.get(1).getTotalWriteRows());
   }
+
+  @Test
+  public void testClientRPCInterference() throws Exception {
+    TajoClient client = cluster.newTajoClient();
+    TajoClient client2 = cluster.newTajoClient();
+
+
+    NettyClientBase rpcClient = Whitebox.getInternalState(client, NettyClientBase.class);
+    assertNotNull(rpcClient);
+
+    NettyClientBase rpcClient2 = Whitebox.getInternalState(client2, NettyClientBase.class);
+    assertNotNull(rpcClient);
+
+    assertNotEquals(rpcClient.getChannel().eventLoop(), rpcClient2.getChannel().eventLoop());
+
+    client.close();
+    client2.close();
+
+    rpcClient.getChannel().eventLoop().terminationFuture().sync();
+    assertTrue(rpcClient.getChannel().eventLoop().isTerminated());
+
+    rpcClient2.getChannel().eventLoop().terminationFuture().sync();
+    assertTrue(rpcClient2.getChannel().eventLoop().isTerminated());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 754df7f..1197e98 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -49,7 +49,6 @@ import org.apache.tajo.master.rm.TajoResourceManager;
 import org.apache.tajo.metrics.ClusterResourceMetricSet;
 import org.apache.tajo.metrics.Master;
 import org.apache.tajo.plan.function.python.PythonScriptEngine;
-import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.rule.EvaluationContext;
@@ -86,6 +85,8 @@ public class TajoMaster extends CompositeService {
   /** Class Logger */
   private static final Log LOG = LogFactory.getLog(TajoMaster.class);
 
+  public static final int SHUTDOWN_HOOK_PRIORITY = 10;
+
   /** rw-r--r-- */
   @SuppressWarnings("OctalInteger")
   final public static FsPermission TAJO_ROOT_DIR_PERMISSION = FsPermission.createImmutable((short) 0755);
@@ -162,7 +163,7 @@ public class TajoMaster extends CompositeService {
   public void serviceInit(Configuration conf) throws Exception {
 
     this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
-    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+    ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY);
 
     context = new MasterContext(systemConf);
     clock = new SystemClock();
@@ -547,12 +548,13 @@ public class TajoMaster extends CompositeService {
             && AbstractDBStore.needShutdown(catalogServer.getStoreUri())) {
           DerbyStore.shutdown();
         }
-        RpcChannelFactory.shutdownGracefully();
+        RpcClientManager.shutdown();
       }
     }
   }
 
   public static void main(String[] args) throws Exception {
+    Thread.setDefaultUncaughtExceptionHandler(new TajoUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
 
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index ff85a4b..762278b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.NettyUtils;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -89,7 +89,7 @@ public class Fetcher {
     if (!useLocalFile) {
       bootstrap = new Bootstrap()
           .group(
-              RpcChannelFactory.getSharedClientEventloopGroup(RpcChannelFactory.ClientChannelId.FETCHER,
+              NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER,
                   conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM)))
           .channel(NioSocketChannel.class)
           .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 65a9511..fbb8d54 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -43,7 +43,6 @@ import org.apache.tajo.plan.function.python.PythonScriptEngine;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.QueryMaster;
 import org.apache.tajo.querymaster.QueryMasterManagerService;
-import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
@@ -78,6 +77,7 @@ public class TajoWorker extends CompositeService {
   public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
   public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
   public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build();
+  public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private static final Log LOG = LogFactory.getLog(TajoWorker.class);
 
@@ -147,7 +147,7 @@ public class TajoWorker extends CompositeService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+    ShutdownHookManager.get().addShutdownHook(new ShutdownHook(), SHUTDOWN_HOOK_PRIORITY);
 
     this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     RackResolver.init(systemConf);
@@ -571,7 +571,7 @@ public class TajoWorker extends CompositeService {
         LOG.info("TajoWorker received SIGINT Signal");
         LOG.info("============================================");
         stop();
-        RpcChannelFactory.shutdownGracefully();
+        RpcClientManager.shutdown();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index 29cf719..59a758f 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -58,7 +58,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
+import org.apache.tajo.rpc.NettyUtils;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
@@ -190,7 +190,7 @@ public class TajoPullServerService extends AbstractService {
       int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num",
           Runtime.getRuntime().availableProcessors() * 2);
 
-      selector = RpcChannelFactory.createServerChannelFactory("TajoPullServerService", workerNum)
+      selector = NettyUtils.createServerBootstrap("TajoPullServerService", workerNum)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.TCP_NODELAY, true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
index ad443d7..2c154bf 100644
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyServerBase.java
@@ -79,7 +79,7 @@ public class NettyServerBase {
       listener.onBeforeInit(this);
     }
     
-    bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum);
+    bootstrap = NettyUtils.createServerBootstrap(serviceName, workerNum);
 
     this.initializer = initializer;
     bootstrap

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
new file mode 100644
index 0000000..01fd48b
--- /dev/null
+++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/NettyUtils.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.rpc;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public final class NettyUtils {
+  private static final Log LOG = LogFactory.getLog(NettyUtils.class);
+  
+  private static final int DEFAULT_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2;
+
+  private static final Object lockObjectForLoopGroup = new Object();
+  private static AtomicInteger serverCount = new AtomicInteger(0);
+
+  public enum GROUP {
+    DEFAULT,
+    FETCHER
+  }
+
+  private static final Map<GROUP, EventLoopGroup> eventLoopGroupMap =
+      new ConcurrentHashMap<GROUP, EventLoopGroup>();
+
+  private NettyUtils(){
+  }
+
+  /**
+   * Get default EventLoopGroup of netty’s. servers and clients can shared it.
+   */
+  public static EventLoopGroup getDefaultEventLoopGroup() {
+    return getSharedEventLoopGroup(GROUP.DEFAULT, DEFAULT_THREAD_NUM);
+  }
+
+  /**
+   * Get EventLoopGroup of netty’s.
+   *
+   * @param clientId
+   * @param threads
+   * @return A EventLoopGroup by key
+   */
+  public static EventLoopGroup getSharedEventLoopGroup(GROUP clientId, int threads) {
+    EventLoopGroup returnEventLoopGroup;
+
+    synchronized (lockObjectForLoopGroup) {
+      if (!eventLoopGroupMap.containsKey(clientId)) {
+        eventLoopGroupMap.put(clientId, createEventLoopGroup(clientId.name(), threads));
+      }
+
+      returnEventLoopGroup = eventLoopGroupMap.get(clientId);
+      if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
+        returnEventLoopGroup = createEventLoopGroup(clientId.name(), threads);
+        eventLoopGroupMap.put(clientId, returnEventLoopGroup);
+      }
+    }
+
+    return returnEventLoopGroup;
+  }
+
+  public static EventLoopGroup createEventLoopGroup(String name, int threads) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Create " + name + " EventLoopGroup. threads:" + threads);
+    }
+
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    ThreadFactory clientFactory = builder.setNameFormat(name + " #%d").build();
+
+    return createEventLoopGroup(threads, clientFactory);
+  }
+
+  protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
+    return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
+  }
+
+  private static EventLoopGroup createEventLoopGroup(int threads, ThreadFactory factory) {
+    return new NioEventLoopGroup(threads, factory);
+  }
+
+  /**
+   * Server must release the external resources
+   */
+  public static ServerBootstrap createServerBootstrap(String name, int threads) {
+    name = name + "-" + serverCount.incrementAndGet();
+
+    EventLoopGroup eventLoopGroup = createEventLoopGroup(name, threads);
+    return new ServerBootstrap().group(eventLoopGroup, eventLoopGroup);
+  }
+
+  public static void shutdownGracefully() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Shutdown Shared RPC Pool");
+    }
+    synchronized (lockObjectForLoopGroup) {
+      for (EventLoopGroup eventLoopGroup : eventLoopGroupMap.values()) {
+        try {
+          shutdown(eventLoopGroup).sync();
+        } catch (InterruptedException e) {
+          //ignore
+        }
+      }
+      eventLoopGroupMap.clear();
+    }
+  }
+
+  public static io.netty.util.concurrent.Future shutdown(EventLoopGroup eventLoopGroup) {
+    if (eventLoopGroup != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Shutdown EventLoopGroup :" + eventLoopGroup.toString());
+      }
+
+      return eventLoopGroup.shutdownGracefully();
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
deleted file mode 100644
index eb34ca2..0000000
--- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public final class RpcChannelFactory {
-  private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class);
-  
-  private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2;
-
-  private static final Object lockObjectForLoopGroup = new Object();
-  private static AtomicInteger serverCount = new AtomicInteger(0);
-
-  public enum ClientChannelId {
-    CLIENT_DEFAULT,
-    FETCHER
-  }
-
-  private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount =
-      new ConcurrentHashMap<ClientChannelId, Integer>();
-  private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool =
-      new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>();
-
-  private RpcChannelFactory(){
-  }
-  
-  static {
-    Runtime.getRuntime().addShutdownHook(new CleanUpHandler());
-
-    defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1);
-    defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1);
-  }
-
-  /**
-  * make this factory static thus all clients can share its thread pool.
-  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
-  */
-  public static EventLoopGroup getSharedClientEventloopGroup() {
-    return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM);
-  }
-  
-  /**
-  * make this factory static thus all clients can share its thread pool.
-  * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe
-  *
-  * @param workerNum The number of workers
-  */
-  public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){
-    //shared woker and boss pool
-    return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum);
-  }
-
-  /**
-   * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput.
-   *
-   * @param clientId
-   * @param workerNum
-   * @return
-   */
-  public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) {
-    Queue<EventLoopGroup> eventLoopGroupQueue;
-    EventLoopGroup returnEventLoopGroup;
-
-    synchronized (lockObjectForLoopGroup) {
-      eventLoopGroupQueue = eventLoopGroupPool.get(clientId);
-      if (eventLoopGroupQueue == null) {
-        eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum);
-      }
-
-      returnEventLoopGroup = eventLoopGroupQueue.poll();
-      if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) {
-        returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum);
-      }
-      eventLoopGroupQueue.add(returnEventLoopGroup);
-    }
-
-    return returnEventLoopGroup;
-  }
-
-  protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) {
-    return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown());
-  }
-
-  // Client must release the external resources
-  protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) {
-    int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId);
-    Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>();
-    eventLoopGroupPool.put(clientId, loopGroupQueue);
-
-    for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) {
-      loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum));
-    }
-
-    return loopGroupQueue;
-  }
-
-  protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum);
-    }
-
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build();
-
-    return new NioEventLoopGroup(workerNum, clientFactory);
-  }
-
-  // Client must release the external resources
-  public static ServerBootstrap createServerChannelFactory(String name, int workerNum) {
-    name = name + "-" + serverCount.incrementAndGet();
-    if(LOG.isInfoEnabled()){
-      LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum);
-    }
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build();
-    ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build();
-    
-    EventLoopGroup bossGroup =
-        new NioEventLoopGroup(1, bossFactory);
-    EventLoopGroup workerGroup = 
-        new NioEventLoopGroup(workerNum, workerFactory);
-    
-    return new ServerBootstrap().group(bossGroup, workerGroup);
-  }
-
-  public static void shutdownGracefully(){
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Shutdown Shared RPC Pool");
-    }
-
-    synchronized(lockObjectForLoopGroup) {
-      for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) {
-        for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) {
-          eventLoopGroup.shutdownGracefully();
-        }
-
-        eventLoopGroupQueue.clear();
-      }
-      eventLoopGroupPool.clear();
-    }
-  }
-  
-  static class CleanUpHandler extends Thread {
-
-    @Override
-    public void run() {
-      RpcChannelFactory.shutdownGracefully();
-    }
-    
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
index dd7d495..6fb62d4 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java
@@ -18,9 +18,11 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.*;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.EventLoopGroup;
 import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
@@ -34,9 +36,10 @@ public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallb
   private final ProxyRpcChannel rpcChannel;
   private final NettyChannelInboundHandler handler;
 
+  @VisibleForTesting
   AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
       throws ClassNotFoundException, NoSuchMethodException {
-    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false);
+    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup());
   }
 
   /**
@@ -49,10 +52,12 @@ public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallb
    *                         otherwise it is request timeout on active-state
    * @param timeUnit         TimeUnit
    * @param enablePing       enable to detect remote peer hangs
+   * @param eventLoopGroup   thread pool of netty's
    * @throws ClassNotFoundException
    * @throws NoSuchMethodException
    */
-  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing)
+  AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing,
+                 EventLoopGroup eventLoopGroup)
       throws ClassNotFoundException, NoSuchMethodException {
     super(rpcConnectionKey, retries);
 
@@ -62,7 +67,7 @@ public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallb
     init(new ProtoClientChannelInitializer(handler,
         RpcResponse.getDefaultInstance(),
         timeUnit.toNanos(timeout),
-        enablePing));
+        enablePing), eventLoopGroup);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
index 88ffaf6..0e12c53 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java
@@ -42,7 +42,7 @@ public class AsyncRpcServer extends NettyServerBase {
   public AsyncRpcServer(final Class<?> protocol,
                         final Object instance,
                         final InetSocketAddress bindAddress,
-                        final int workerNum)
+                        final int threads)
       throws Exception {
     super(protocol.getSimpleName(), bindAddress);
 
@@ -54,7 +54,7 @@ public class AsyncRpcServer extends NettyServerBase {
     this.service = (Service) method.invoke(null, instance);
 
     this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
-    super.init(this.initializer, workerNum);
+    super.init(this.initializer, threads);
   }
 
   @ChannelHandler.Sharable

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
index 349a0a0..4327003 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java
@@ -18,9 +18,11 @@
 
 package org.apache.tajo.rpc;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.*;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.EventLoopGroup;
 import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey;
 import org.apache.tajo.rpc.RpcProtos.RpcResponse;
 
@@ -35,9 +37,10 @@ public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCa
   private final ProxyRpcChannel rpcChannel;
   private final NettyChannelInboundHandler handler;
 
+  @VisibleForTesting
   BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries)
       throws NoSuchMethodException, ClassNotFoundException {
-    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false);
+    this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup());
   }
 
   /**
@@ -50,11 +53,12 @@ public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCa
    *                         otherwise it is request timeout on active-state
    * @param timeUnit         TimeUnit
    * @param enablePing       enable to detect remote peer hangs
+   * @param eventLoopGroup   thread pool of netty's
    * @throws ClassNotFoundException
    * @throws NoSuchMethodException
    */
-  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit,
-                    boolean enablePing) throws ClassNotFoundException, NoSuchMethodException {
+  BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing,
+                    EventLoopGroup eventLoopGroup) throws ClassNotFoundException, NoSuchMethodException {
     super(rpcConnectionKey, retries);
 
     this.stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class);
@@ -63,7 +67,7 @@ public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCa
     init(new ProtoClientChannelInitializer(handler,
         RpcResponse.getDefaultInstance(),
         timeUnit.toNanos(timeout),
-        enablePing));
+        enablePing), eventLoopGroup);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
index 007ada5..3f538bb 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java
@@ -39,7 +39,7 @@ public class BlockingRpcServer extends NettyServerBase {
   public BlockingRpcServer(final Class<?> protocol,
                            final Object instance,
                            final InetSocketAddress bindAddress,
-                           final int workerNum)
+                           final int threads)
       throws Exception {
 
     super(protocol.getSimpleName(), bindAddress);
@@ -55,7 +55,7 @@ public class BlockingRpcServer extends NettyServerBase {
     this.service = (BlockingService) method.invoke(null, instance);
     this.initializer = new ProtoServerChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance());
 
-    super.init(this.initializer, workerNum);
+    super.init(this.initializer, threads);
   }
 
   @ChannelHandler.Sharable

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 5f76bfc..c6d90ed 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -39,7 +39,6 @@ import java.lang.reflect.Method;
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.net.UnknownHostException;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.Collection;
 import java.util.concurrent.ConcurrentHashMap;
@@ -66,10 +65,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
   }
 
   // should be called from sub class
-  protected void init(ChannelInitializer<Channel> initializer) {
+  protected void init(ChannelInitializer<Channel> initializer, EventLoopGroup eventLoopGroup) {
     this.bootstrap = new Bootstrap();
     this.bootstrap
-        .group(RpcChannelFactory.getSharedClientEventloopGroup())
+        .group(eventLoopGroup)
         .channel(NioSocketChannel.class)
         .handler(initializer)
         .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
@@ -117,6 +116,11 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
    */
   protected void invoke(final RpcProtos.RpcRequest rpcRequest, final T callback, final int retry) {
 
+    if(getChannel().eventLoop().isShuttingDown()) {
+      LOG.warn("RPC is shutting down");
+      return;
+    }
+
     ChannelPromise promise = getChannel().newPromise();
     promise.addListener(new GenericFutureListener<ChannelFuture>() {
 
@@ -197,6 +201,11 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
       if (maxRetries > retries) {
         retries++;
 
+        if(getChannel().eventLoop().isShuttingDown()) {
+          LOG.warn("RPC is shutting down");
+          return;
+        }
+
         LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + "\nTry to reconnect : " + getKey().addr);
         try {
           Thread.sleep(RpcConstants.DEFAULT_PAUSE);

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index 111754e..aa7ba67 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.rpc;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.internal.logging.CommonsLoggerFactory;
@@ -64,12 +65,22 @@ public class RpcClientManager {
                                                    long timeout,
                                                    TimeUnit timeUnit,
                                                    boolean enablePing)
+      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
+    return makeClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, NettyUtils.getDefaultEventLoopGroup());
+  }
+
+  private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey,
+                                                   int retries,
+                                                   long timeout,
+                                                   TimeUnit timeUnit,
+                                                   boolean enablePing,
+                                                   EventLoopGroup eventLoopGroup)
       throws NoSuchMethodException, ClassNotFoundException, ConnectException {
     NettyClientBase client;
     if (rpcConnectionKey.asyncMode) {
-      client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing);
+      client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup);
     } else {
-      client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing);
+      client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup);
     }
     return (T) client;
   }
@@ -152,6 +163,19 @@ public class RpcClientManager {
     return client;
   }
 
+  public synchronized <T extends NettyClientBase> T newBlockingClient(InetSocketAddress addr,
+                                                                      Class<?> protocolClass,
+                                                                      int retries,
+                                                                      EventLoopGroup eventLoopGroup)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+    T client = makeClient(new RpcConnectionKey(addr, protocolClass, false),
+        retries, 0, TimeUnit.SECONDS, false, eventLoopGroup);
+    client.connect();
+    assert client.isConnected();
+    return client;
+  }
+
   /**
    * Request to close this clients
    * After it is closed, it is removed from clients map.
@@ -174,7 +198,7 @@ public class RpcClientManager {
    */
   public static void shutdown() {
     close();
-    RpcChannelFactory.shutdownGracefully();
+    NettyUtils.shutdownGracefully();
   }
 
   protected static boolean contains(RpcConnectionKey key) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
index 18c7d80..4f17476 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java
@@ -139,7 +139,7 @@ public class TestAsyncRpc {
 
   @AfterClass
   public static void tearDownClass() throws Exception {
-    RpcChannelFactory.shutdownGracefully();
+    RpcClientManager.shutdown();
   }
 
   public void tearDownRpcServer() throws Exception {

http://git-wip-us.apache.org/repos/asf/tajo/blob/60d8d4bc/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 9f95f58..0fae7ee 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -130,7 +130,7 @@ public class TestBlockingRpc {
 
   @AfterClass
   public static void tearDownClass() throws Exception {
-    RpcChannelFactory.shutdownGracefully();
+    RpcClientManager.shutdown();
   }
 
   public void tearDownRpcServer() throws Exception {