You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/07 18:36:00 UTC

[1/6] tajo git commit: TAJO-1583: Remove ServerCallable in RPC client. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/index_support 86c97b2a1 -> 42bcf2de0


http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index be757af..84decd5 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -32,19 +32,18 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
-import org.apache.tajo.rpc.ServerCallable;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
@@ -53,7 +52,7 @@ import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProto
 
 public class SessionConnection implements Closeable {
 
-  private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
+  private final static Log LOG = LogFactory.getLog(SessionConnection.class);
 
   final RpcClientManager manager;
 
@@ -70,6 +69,8 @@ public class SessionConnection implements Closeable {
 
   private ServiceTracker serviceTracker;
 
+  private NettyClientBase client;
+
   private KeyValueSet properties;
 
   /**
@@ -88,27 +89,34 @@ public class SessionConnection implements Closeable {
 
     this.manager = RpcClientManager.getInstance();
     this.manager.setRetries(properties.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES));
-    this.manager.setTimeoutSeconds(
-        properties.getInt(RpcConstants.RPC_CLIENT_TIMEOUT_SECS, 0)); // disable rpc timeout
-
     this.userInfo = UserRoleInfo.getCurrentUser();
     this.baseDatabase = baseDatabase != null ? baseDatabase : null;
 
     this.serviceTracker = tracker;
+    try {
+      this.client = getTajoMasterConnection();
+    } catch (ServiceException e) {
+      throw new IOException(e);
+    }
   }
 
   public Map<String, String> getClientSideSessionVars() {
     return Collections.unmodifiableMap(sessionVarsCache);
   }
 
-  public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
-      ConnectException, ClassNotFoundException {
-    return manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
-  }
-
-  public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode)
-      throws NoSuchMethodException, ConnectException, ClassNotFoundException {
-    return manager.getClient(addr, protocolClass, asyncMode);
+  public synchronized NettyClientBase getTajoMasterConnection() throws ServiceException {
+    if (client != null && client.isConnected()) return client;
+    else {
+      try {
+        RpcClientManager.cleanup(client);
+        // Client do not closed on idle state for support high available
+        this.client = manager.newClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false,
+            manager.getRetries(), 0, TimeUnit.SECONDS, false);
+      } catch (Exception e) {
+        throw new ServiceException(e);
+      }
+      return client;
+    }
   }
 
   protected KeyValueSet getProperties() {
@@ -129,10 +137,9 @@ public class SessionConnection implements Closeable {
   }
 
   public boolean isConnected() {
-    if(!closed.get()){
+    if (!closed.get()) {
       try {
-        return manager.getClient(serviceTracker.getClientServiceAddress(),
-            TajoMasterClientProtocol.class, false).isConnected();
+        return getTajoMasterConnection().isConnected();
       } catch (Throwable e) {
         return false;
       }
@@ -145,64 +152,51 @@ public class SessionConnection implements Closeable {
   }
 
   public String getCurrentDatabase() throws ServiceException {
-    return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
-      public String call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
   }
 
   public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException {
-    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Map<String, String> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        KeyValueSet keyValueSet = new KeyValueSet();
-        keyValueSet.putAll(variables);
-        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-            .setSessionId(sessionId)
-            .setSessionVars(keyValueSet.getProto()).build();
-
-        SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
-
-        if (response.getResultCode() == ResultCode.OK) {
-          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-          return Collections.unmodifiableMap(sessionVarsCache);
-        } else {
-          throw new ServiceException(response.getMessage());
-        }
-      }
-    }.withRetries();
-  }
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-  public Map<String, String> unsetSessionVariables(final List<String> variables)  throws ServiceException {
-    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    KeyValueSet keyValueSet = new KeyValueSet();
+    keyValueSet.putAll(variables);
+    ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+        .setSessionId(sessionId)
+        .setSessionVars(keyValueSet.getProto()).build();
 
-      public Map<String, String> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
+    SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-            .setSessionId(sessionId)
-            .addAllUnsetVariables(variables).build();
+    if (response.getResultCode() == ResultCode.OK) {
+      updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+      return Collections.unmodifiableMap(sessionVarsCache);
+    } else {
+      throw new ServiceException(response.getMessage());
+    }
+  }
 
-        SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+  public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        if (response.getResultCode() == ResultCode.OK) {
-          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-          return Collections.unmodifiableMap(sessionVarsCache);
-        } else {
-          throw new ServiceException(response.getMessage());
-        }
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+        .setSessionId(sessionId)
+        .addAllUnsetVariables(variables).build();
+
+    SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+
+    if (response.getResultCode() == ResultCode.OK) {
+      updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+      return Collections.unmodifiableMap(sessionVarsCache);
+    } else {
+      throw new ServiceException(response.getMessage());
+    }
   }
 
   void updateSessionVarsCache(Map<String, String> variables) {
@@ -213,35 +207,26 @@ public class SessionConnection implements Closeable {
   }
 
   public String getSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<String>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
-      public String call(NettyClientBase client) throws ServiceException {
-
-        synchronized (sessionVarsCache) {
-          // If a desired variable is client side one and exists in the cache, immediately return the variable.
-          if (sessionVarsCache.containsKey(varname)) {
-            return sessionVarsCache.get(varname);
-          }
-        }
+    synchronized (sessionVarsCache) {
+      // If a desired variable is client side one and exists in the cache, immediately return the variable.
+      if (sessionVarsCache.containsKey(varname)) {
+        return sessionVarsCache.get(varname);
+      }
+    }
 
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
   }
 
   public Boolean existSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
   }
 
   public Map<String, String> getCachedAllSessionVariables() {
@@ -251,29 +236,19 @@ public class SessionConnection implements Closeable {
   }
 
   public Map<String, String> getAllSessionVariables() throws ServiceException {
-    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class,
-        false) {
-
-      public Map<String, String> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId));
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    return ProtoUtil.convertToMap(tajoMasterService.getAllSessionVariables(null, sessionId));
   }
 
   public Boolean selectDatabase(final String databaseName) throws ServiceException {
-    Boolean selected = new ServerCallable<Boolean>(manager, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = getTajoMasterConnection();
+    checkSessionAndGet(client);
 
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
-      }
-    }.withRetries();
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    boolean selected = tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
 
     if (selected) {
       this.baseDatabase = databaseName;
@@ -283,14 +258,14 @@ public class SessionConnection implements Closeable {
 
   @Override
   public void close() {
-    if(closed.getAndSet(true)){
+    if (closed.getAndSet(true)) {
       return;
     }
 
     // remove session
     NettyClientBase client = null;
     try {
-      client = manager.getClient(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+      client = getTajoMasterConnection();
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
       tajoMaster.removeSession(null, sessionId);
     } catch (Throwable e) {
@@ -333,55 +308,51 @@ public class SessionConnection implements Closeable {
   }
 
   public boolean reconnect() throws Exception {
-    return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
-        builder.setUsername(userInfo.getUserName()).build();
-        if (baseDatabase != null) {
-          builder.setBaseDatabaseName(baseDatabase);
-        }
+    CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+    builder.setUsername(userInfo.getUserName()).build();
+    if (baseDatabase != null) {
+      builder.setBaseDatabaseName(baseDatabase);
+    }
 
+    NettyClientBase client = getTajoMasterConnection();
 
-        // create new session
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
-        if (response.getResultCode() != ResultCode.OK) {
-          return false;
-        }
+    // create new session
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
+    if (response.getResultCode() != ResultCode.OK) {
+      return false;
+    }
 
-        // Invalidate some session variables in client cache
-        sessionId = response.getSessionId();
-        Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
-        synchronized (sessionVarsCache) {
-          for (SessionVars var : UPDATE_ON_RECONNECT) {
-            String value = sessionVars.get(var.keyname());
-            if (value != null) {
-              sessionVarsCache.put(var.keyname(), value);
-            }
-          }
+    // Invalidate some session variables in client cache
+    sessionId = response.getSessionId();
+    Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+    synchronized (sessionVarsCache) {
+      for (SessionVars var : UPDATE_ON_RECONNECT) {
+        String value = sessionVars.get(var.keyname());
+        if (value != null) {
+          sessionVarsCache.put(var.keyname(), value);
         }
+      }
+    }
 
-        // Update the session variables in server side
-        try {
-          KeyValueSet keyValueSet = new KeyValueSet();
-          keyValueSet.putAll(sessionVarsCache);
-          ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-              .setSessionId(sessionId)
-              .setSessionVars(keyValueSet.getProto()).build();
-
-          if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
-            tajoMasterService.removeSession(null, sessionId);
-            return false;
-          }
-          LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
-          return true;
-        } catch (ServiceException e) {
-          tajoMasterService.removeSession(null, sessionId);
-          return false;
-        }
+    // Update the session variables in server side
+    try {
+      KeyValueSet keyValueSet = new KeyValueSet();
+      keyValueSet.putAll(sessionVarsCache);
+      ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+          .setSessionId(sessionId)
+          .setSessionVars(keyValueSet.getProto()).build();
+
+      if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
+        tajoMasterService.removeSession(null, sessionId);
+        return false;
       }
-    }.withRetries();
+      LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+      return true;
+    } catch (ServiceException e) {
+      tajoMasterService.removeSession(null, sessionId);
+      return false;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
index df709c5..0bb11e0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestDefaultCliOutputFormatter.java
@@ -88,7 +88,6 @@ public class TestDefaultCliOutputFormatter {
     String multiLineMessage =
         "ERROR: java.sql.SQLException: ERROR: no such a table: table1\n" +
         "com.google.protobuf.ServiceException: java.sql.SQLException: ERROR: no such a table: table1\n" +
-        "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:107)\n" +
         "\tat org.apache.tajo.client.TajoClient.getTableDesc(TajoClient.java:777)\n" +
         "\tat org.apache.tajo.cli.tsql.commands.DescTableCommand.invoke(DescTableCommand.java:43)\n" +
         "\tat org.apache.tajo.cli.tsql.TajoCli.executeMetaCommand(TajoCli.java:300)\n" +
@@ -96,9 +95,6 @@ public class TestDefaultCliOutputFormatter {
         "\tat org.apache.tajo.cli.tsql.TajoCli.runShell(TajoCli.java:271)\n" +
         "\tat org.apache.tajo.cli.tsql.TajoCli.main(TajoCli.java:420)\n" +
         "Caused by: java.sql.SQLException: ERROR: no such a table: table1\n" +
-        "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:791)\n" +
-        "\tat org.apache.tajo.client.TajoClient$22.call(TajoClient.java:778)\n" +
-        "\tat org.apache.tajo.rpc.ServerCallable.withRetries(ServerCallable.java:97)\n" +
         "\t... 6 more";
 
     assertEquals("ERROR: no such a table: table1", DefaultTajoCliOutputFormatter.parseErrorMessage(multiLineMessage));

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index b2e1ce9..b1a27fa 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -34,10 +34,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.master.event.QueryEvent;
-import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.master.event.StageEvent;
-import org.apache.tajo.master.event.StageEventType;
+import org.apache.tajo.master.event.*;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -131,8 +128,7 @@ public class TestKillQuery {
     assertNotNull(stage);
 
     // fire kill event
-    Query q = queryMasterTask.getQuery();
-    q.handle(new QueryEvent(queryId, QueryEventType.KILL));
+    queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
 
     try {
       cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
@@ -157,24 +153,55 @@ public class TestKillQuery {
   @Test
   public final void testIgnoreStageStateFromKilled() throws Exception {
 
-    ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
-    QueryId queryId = new QueryId(res.getQueryId());
-    cluster.waitForQuerySubmitted(queryId);
+    SQLAnalyzer analyzer = new SQLAnalyzer();
+    QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+    Session session = LocalTajoTestingUtility.createDummySession();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+
+    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
+    Expr expr =  analyzer.parse(queryStr);
+    LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+    optimizer.optimize(plan);
+
+    QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+    QueryContext queryContext = new QueryContext(conf);
+    MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+    GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+    globalPlanner.build(masterPlan);
 
-    QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
-    Query query = qmt.getQuery();
+    CountDownLatch barrier  = new CountDownLatch(1);
+    MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+
+    QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+    QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+        queryId, session, defaultContext, expr.toJson(), dispatch);
 
-    // wait for a stage created
-    cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
-    query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+    queryMasterTask.init(conf);
+    queryMasterTask.getQueryTaskContext().getDispatcher().start();
+    queryMasterTask.startQuery();
 
     try{
-      cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
-    } finally {
-      assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState());
+      barrier.await(5000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+    }
+
+    Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+    assertNotNull(stage);
+
+    // fire kill event
+    queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+    try {
+      cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+      assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+    }   finally {
+      queryMasterTask.stop();
     }
 
-    List<Stage> stages = Lists.newArrayList(query.getStages());
+    List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
     Stage lastStage = stages.get(stages.size() - 1);
 
     assertEquals(StageState.KILLED, lastStage.getSynchronizedState());

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 190beae..8f6f9ed 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -252,11 +252,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
 
   @Override
   public void close() {
-    getHandler().sendExceptions(getClass().getSimpleName() + "terminates all the connections");
-
     Channel channel = getChannel();
     if (channel != null && channel.isOpen()) {
       LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress());
+      /* channelInactive receives event and then client terminates all the requests */
       channel.close().syncUninterruptibly();
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
deleted file mode 100644
index 3c054ad..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RetriesExhaustedException.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.List;
-
-public class RetriesExhaustedException extends RuntimeException {
-  private static final long serialVersionUID = 1876775844L;
-
-  public RetriesExhaustedException(final String msg) {
-    super(msg);
-  }
-
-  public RetriesExhaustedException(final String msg, final IOException e) {
-    super(msg, e);
-  }
-
-  /**
-   * Datastructure that allows adding more info around Throwable incident.
-   */
-  public static class ThrowableWithExtraContext {
-    private final Throwable t;
-    private final long when;
-    private final String extras;
-
-    public ThrowableWithExtraContext(final Throwable t, final long when,
-        final String extras) {
-      this.t = t;
-      this.when = when;
-      this.extras = extras;
-    }
- 
-    @Override
-    public String toString() {
-      return new Date(this.when).toString() + ", " + extras + ", " + t.toString();
-    }
-  }
-
-  /**
-   * Create a new RetriesExhaustedException from the list of prior failures.
-   * @param callableVitals Details from the {@link ServerCallable} we were using
-   * when we got this exception.
-   * @param numTries The number of tries we made
-   * @param exceptions List of exceptions that failed before giving up
-   */
-  public RetriesExhaustedException(final String callableVitals, int numTries,
-      List<Throwable> exceptions) {
-    super(getMessage(callableVitals, numTries, exceptions));
-  }
-
-  /**
-   * Create a new RetriesExhaustedException from the list of prior failures.
-   * @param numTries
-   * @param exceptions List of exceptions that failed before giving up
-   */
-  public RetriesExhaustedException(final int numTries,
-      final List<Throwable> exceptions) {
-    super(getMessage(numTries, exceptions));
-  }
-
-  private static String getMessage(String callableVitals, int numTries,
-      List<Throwable> exceptions) {
-    StringBuilder buffer = new StringBuilder("Failed contacting ");
-    buffer.append(callableVitals);
-    buffer.append(" after ");
-    buffer.append(numTries + 1);
-    buffer.append(" attempts.\nExceptions:\n");
-    for (Throwable t : exceptions) {
-      buffer.append(t.toString());
-      buffer.append("\n");
-    }
-    return buffer.toString();
-  }
-
-  private static String getMessage(final int numTries,
-      final List<Throwable> exceptions) {
-    StringBuilder buffer = new StringBuilder("Failed after attempts=");
-    buffer.append(numTries + 1);
-    buffer.append(", exceptions:\n");
-    for (Throwable t : exceptions) {
-      buffer.append(t.toString());
-      buffer.append("\n");
-    }
-    return buffer.toString();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
deleted file mode 100644
index 2804a03..0000000
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ServerCallable.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.rpc;
-
-import com.google.protobuf.ServiceException;
-
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.net.InetSocketAddress;
-
-public abstract class ServerCallable<T> {
-  protected InetSocketAddress addr;
-  protected long startTime;
-  protected long endTime;
-  protected Class<?> protocol;
-  protected boolean asyncMode;
-  protected boolean closeConn;
-  protected RpcClientManager manager;
-
-  public abstract T call(NettyClientBase client) throws Exception;
-
-  public ServerCallable(RpcClientManager manager, InetSocketAddress addr, Class<?> protocol,
-                        boolean asyncMode) {
-    this.manager = manager;
-    this.addr = addr;
-    this.protocol = protocol;
-    this.asyncMode = asyncMode;
-  }
-
-  public void beforeCall() {
-    this.startTime = System.currentTimeMillis();
-  }
-
-  public long getStartTime(){
-    return startTime;
-  }
-
-  public void afterCall() {
-    this.endTime = System.currentTimeMillis();
-  }
-
-  public long getEndTime(){
-    return endTime;
-  }
-
-  boolean abort = false;
-  public void abort() {
-    abort = true;
-  }
-  /**
-   * Run this instance with retries, timed waits,
-   * and refinds of missing regions.
-   *
-   * @return an object of type T
-   * @throws com.google.protobuf.ServiceException if a remote or network exception occurs
-   */
-
-  public T withRetries() throws ServiceException {
-    //TODO configurable
-    final long pause = 500; //ms
-    final int numRetries = 3;
-
-    for (int tries = 0; tries < numRetries; tries++) {
-      NettyClientBase client = null;
-      try {
-        beforeCall();
-        if(addr != null) {
-          client = manager.getClient(addr, protocol, asyncMode);
-        }
-        return call(client);
-      } catch (IOException ioe) {
-        if(abort) {
-          throw new ServiceException(ioe.getMessage(), ioe);
-        }
-        if (tries == numRetries - 1) {
-          throw new ServiceException("Giving up after tries=" + tries, ioe);
-        }
-      } catch (Throwable t) {
-        throw new ServiceException(t);
-      } finally {
-        afterCall();
-        if(closeConn) {
-          RpcClientManager.cleanup(client);
-        }
-      }
-      try {
-        Thread.sleep(pause * (tries + 1));
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new ServiceException("Giving up after tries=" + tries, e);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Run this instance against the server once.
-   * @return an object of type T
-   * @throws java.io.IOException if a remote or network exception occurs
-   * @throws RuntimeException other unspecified error
-   */
-  public T withoutRetries() throws IOException, RuntimeException {
-    NettyClientBase client = null;
-    try {
-      beforeCall();
-      client = manager.getClient(addr, protocol, asyncMode);
-      return call(client);
-    } catch (Throwable t) {
-      Throwable t2 = translateException(t);
-      if (t2 instanceof IOException) {
-        throw (IOException)t2;
-      } else {
-        throw new RuntimeException(t2);
-      }
-    } finally {
-      afterCall();
-      if(closeConn) {
-        RpcClientManager.cleanup(client);
-      }
-    }
-  }
-
-  private static Throwable translateException(Throwable t) throws IOException {
-    if (t instanceof UndeclaredThrowableException) {
-      t = t.getCause();
-    }
-    if (t instanceof RemoteException && t.getCause() != null) {
-      t = t.getCause();
-    }
-    return t;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
index 6f7fdd1..c86db80 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java
@@ -170,45 +170,6 @@ public class TestBlockingRpc {
   }
 
   @Test
-  @SetupRpcConnection(setupRpcClient = false)
-  @Deprecated // serverCallable will be remove
-  public void testRpcWithServiceCallable() throws Exception {
-    RpcClientManager manager = RpcClientManager.getInstance();
-    final SumRequest request = SumRequest.newBuilder()
-        .setX1(1)
-        .setX2(2)
-        .setX3(3.15d)
-        .setX4(2.0f).build();
-
-    SumResponse response =
-        new ServerCallable<SumResponse>(manager,
-            server.getListenAddress(), DummyProtocol.class, false) {
-          @Override
-          public SumResponse call(NettyClientBase client) throws Exception {
-            BlockingInterface stub2 = client.getStub();
-            SumResponse response1 = stub2.sum(null, request);
-            return response1;
-          }
-        }.withRetries();
-
-    assertEquals(8.15d, response.getResult(), 1e-15);
-
-    response =
-        new ServerCallable<SumResponse>(manager,
-            server.getListenAddress(), DummyProtocol.class, false) {
-          @Override
-          public SumResponse call(NettyClientBase client) throws Exception {
-            BlockingInterface stub2 = client.getStub();
-            SumResponse response1 = stub2.sum(null, request);
-            return response1;
-          }
-        }.withoutRetries();
-
-    assertTrue(8.15d == response.getResult());
-    RpcClientManager.close();
-  }
-
-  @Test
   public void testThrowException() throws Exception {
     EchoMessage message = EchoMessage.newBuilder()
         .setMessage(MESSAGE).build();


[3/6] tajo git commit: TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)

Posted by ji...@apache.org.
TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)

Closes #522


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b6b9d463
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b6b9d463
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b6b9d463

Branch: refs/heads/index_support
Commit: b6b9d46318d0aaa0346578b5ecf11b9d9ba74b0c
Parents: 4755410
Author: Jinho Kim <jh...@apache.org>
Authored: Wed May 6 12:11:37 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed May 6 12:11:37 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/storage/rcfile/RCFile.java  | 14 ++---
 .../tajo/storage/text/CSVLineDeserializer.java  | 18 +++++-
 .../org/apache/tajo/storage/TestStorages.java   | 59 ++++++++++++++++++++
 4 files changed, 83 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 952f852..a790655 100644
--- a/CHANGES
+++ b/CHANGES
@@ -108,6 +108,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)
+
     TAJO-1574: Fix NPE on natural join.
     (Contributed by Dongjoon Hyun, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 44aabd4..62e5ed9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -1255,17 +1255,18 @@ public class RCFile {
 
       for (int i = 0; i < targetColumnIndexes.length; i++) {
         int tid = targetColumnIndexes[i];
+        SelectedColumn col = new SelectedColumn();
+        col.colIndex = tid;
         if (tid < columnNumber) {
           skippedColIDs[tid] = false;
-
-          SelectedColumn col = new SelectedColumn();
-          col.colIndex = tid;
           col.runLength = 0;
           col.prvLength = -1;
           col.rowReadIndex = 0;
-          selectedColumns[i] = col;
           colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+        }  else {
+          col.isNulled = true;
         }
+        selectedColumns[i] = col;
       }
 
       currentKey = createKeyBuffer();
@@ -1583,10 +1584,7 @@ public class RCFile {
 
       for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
         SelectedColumn col = selectedColumns[selIx];
-        if (col == null) {
-          col = new SelectedColumn();
-          col.isNulled = true;
-          selectedColumns[selIx] = col;
+        if (col.isNulled) {
           continue;
         }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 6a8c7a9..03a0a26 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBufProcessor;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.FieldSerializerDeserializer;
 import org.apache.tajo.storage.Tuple;
 
@@ -80,8 +81,14 @@ public class CSVLineDeserializer extends TextLineDeserializer {
 
       if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
         lineBuf.setIndex(start, start + fieldLength);
-        Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
-        output.put(currentIndex, datum);
+
+        try {
+          Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
+          output.put(currentIndex, datum);
+        } catch (Exception e) {
+          output.put(currentIndex, NullDatum.get());
+        }
+
         currentTarget++;
       }
 
@@ -92,6 +99,13 @@ public class CSVLineDeserializer extends TextLineDeserializer {
       start = end + 1;
       currentIndex++;
     }
+
+    /* If a text row is less than table schema size, tuple should set to NullDatum */
+    if (projection.length > currentTarget) {
+      for (; currentTarget < projection.length; currentTarget++) {
+        output.put(projection[currentTarget], NullDatum.get());
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 790ac4a..456ea00 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -952,4 +952,63 @@ public class TestStorages {
       StorageManager.clearCache();
     }
   }
+
+  @Test
+  public void testLessThanSchemaSize() throws IOException {
+    /* RAW is internal storage. It must be same with schema size */
+    if (storeType == StoreType.RAW || storeType == StoreType.AVRO){
+      return;
+    }
+
+    Schema dataSchema = new Schema();
+    dataSchema.addColumn("col1", Type.FLOAT4);
+    dataSchema.addColumn("col2", Type.FLOAT8);
+    dataSchema.addColumn("col3", Type.INT2);
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+
+    Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
+    FileStorageManager sm = (FileStorageManager) StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, dataSchema, tablePath);
+    appender.init();
+
+
+    Tuple expect = new VTuple(dataSchema.size());
+    expect.put(new Datum[]{
+        DatumFactory.createFloat4(Float.MAX_VALUE),
+        DatumFactory.createFloat8(Double.MAX_VALUE),
+        DatumFactory.createInt2(Short.MAX_VALUE)
+    });
+
+    appender.addTuple(expect);
+    appender.flush();
+    appender.close();
+
+    assertTrue(fs.exists(tablePath));
+    FileStatus status = fs.getFileStatus(tablePath);
+    Schema inSchema = new Schema();
+    inSchema.addColumn("col1", Type.FLOAT4);
+    inSchema.addColumn("col2", Type.FLOAT8);
+    inSchema.addColumn("col3", Type.INT2);
+    inSchema.addColumn("col4", Type.INT4);
+    inSchema.addColumn("col5", Type.INT8);
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, inSchema, fragment);
+
+    Schema target = new Schema();
+
+    target.addColumn("col2", Type.FLOAT8);
+    target.addColumn("col5", Type.INT8);
+    scanner.setTarget(target.toArray());
+    scanner.init();
+
+    Tuple tuple = scanner.next();
+    scanner.close();
+
+    assertEquals(expect.get(1), tuple.get(1));
+    assertEquals(NullDatum.get(), tuple.get(4));
+  }
 }


[5/6] tajo git commit: TAJO-1556: "insert into select" with reordered column list does not work.

Posted by ji...@apache.org.
TAJO-1556: "insert into select" with reordered column list does not work.

Closes #546

Signed-off-by: Jihoon Son <ji...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9b3824b5
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9b3824b5
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9b3824b5

Branch: refs/heads/index_support
Commit: 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
Parents: 04167bd
Author: Yongjin Choi <su...@gmail.com>
Authored: Wed May 6 18:41:51 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed May 6 18:42:29 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +++
 .../java/org/apache/tajo/catalog/Schema.java    | 12 ++++++---
 .../tajo/engine/planner/TestLogicalPlanner.java | 23 +++++++++++++++--
 .../tajo/engine/query/TestInsertQuery.java      | 19 ++++++++++++++
 .../TestInsertQuery/nation_diff_col_order.ddl   |  1 +
 .../testInsertWithDifferentColumnOrder.sql      |  1 +
 .../testInsertWithDifferentColumnOrder.result   | 27 ++++++++++++++++++++
 .../org/apache/tajo/plan/LogicalPlanner.java    |  9 ++++---
 8 files changed, 86 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ebd88cd..a8074cd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -111,6 +111,9 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1556: "insert into select" with reordered column list does not work.
+    (Contributed by Yongjin Choi, Committed by jihoon)
+
     TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)
 
     TAJO-1574: Fix NPE on natural join.

http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index fcbd177..054cc2c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -157,13 +157,19 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
   }
 
   public Column getColumn(Column column) {
+    int idx = getIndex(column);
+    return idx >= 0 ? fields.get(idx) : null;
+  }
+
+  public int getIndex(Column column) {
     if (!contains(column)) {
-      return null;
+      return -1;
     }
+
     if (column.hasQualifier()) {
-      return fields.get(fieldsByQualifiedName.get(column.getQualifiedName()));
+      return fieldsByQualifiedName.get(column.getQualifiedName());
     } else {
-      return fields.get(fieldsByName.get(column.getSimpleName()).get(0));
+      return fieldsByName.get(column.getSimpleName()).get(0);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 0b59bc7..af0aa6a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -1106,7 +1106,7 @@ public class TestLogicalPlanner {
   // Table descriptions
   //
   // employee (name text, empid int4, deptname text)
-  // dept (deptname text, nameger text)
+  // dept (deptname text, manager text)
   // score (deptname text, score inet4)
 
   static final String [] insertStatements = {
@@ -1115,7 +1115,8 @@ public class TestLogicalPlanner {
       "insert into employee (name, deptname) select * from dept",           // 2
       "insert into location '/tmp/data' select name, empid from employee",  // 3
       "insert overwrite into employee (name, deptname) select * from dept", // 4
-      "insert overwrite into LOCATION '/tmp/data' select * from dept"       // 5
+      "insert overwrite into LOCATION '/tmp/data' select * from dept",      // 5
+      "insert into employee (deptname, name) select deptname, manager from dept"  // 6
   };
 
   @Test
@@ -1198,6 +1199,24 @@ public class TestLogicalPlanner {
     assertTrue(insertNode.hasPath());
   }
 
+  @Test
+  public final void testInsertInto6() throws PlanningException {
+    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+
+    Expr expr = sqlAnalyzer.parse(insertStatements[6]);
+    LogicalPlan plan = planner.createPlan(qc, expr);
+    assertEquals(1, plan.getQueryBlocks().size());
+    InsertNode insertNode = getInsertNode(plan);
+
+    ProjectionNode subquery = insertNode.getChild();
+    Target[] targets = subquery.getTargets();
+    // targets MUST be manager, NULL as empid, deptname
+    assertEquals(targets[0].getNamedColumn().getQualifiedName(), "default.dept.manager");
+    assertEquals(targets[1].getAlias(), "empid");
+    assertEquals(targets[1].getEvalTree().getType(), EvalType.CONST);
+    assertEquals(targets[2].getNamedColumn().getQualifiedName(), "default.dept.deptname");
+  }
+
   private static InsertNode getInsertNode(LogicalPlan plan) {
     LogicalRootNode root = plan.getRootBlock().getRoot();
     assertEquals(NodeType.INSERT, root.getChild().getType());

http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 4a1f601..b3e3402 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -817,4 +817,23 @@ public class TestInsertQuery extends QueryTestCaseBase {
     assertNotNull(resultDatas);
     assertEquals(expected, resultDatas);
   }
+
+  @Test
+  public final void testInsertWithDifferentColumnOrder() throws Exception {
+    ResultSet res = executeFile("nation_diff_col_order.ddl");
+    res.close();
+
+    CatalogService catalog = testingCluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(getCurrentDatabase(), "nation_diff"));
+
+    try {
+      res = executeFile("testInsertWithDifferentColumnOrder.sql");
+      res.close();
+
+      res = executeString("select * from nation_diff");
+      assertResultSet(res);
+    } finally {
+      executeString("drop table nation_diff purge;");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl b/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl
new file mode 100644
index 0000000..6998304
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/nation_diff_col_order.ddl
@@ -0,0 +1 @@
+create table nation_diff (n_nationkey int8, n_name text, n_regionkey int8, n_comment text);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql
new file mode 100644
index 0000000..ad360f9
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestInsertQuery/testInsertWithDifferentColumnOrder.sql
@@ -0,0 +1 @@
+insert overwrite into nation_diff (n_comment, n_name) select n_comment, n_name from default.nation;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result b/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result
new file mode 100644
index 0000000..4cd3b81
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestInsertQuery/testInsertWithDifferentColumnOrder.result
@@ -0,0 +1,27 @@
+n_nationkey,n_name,n_regionkey,n_comment
+-------------------------------
+null,ALGERIA,null, haggle. carefully final deposits detect slyly agai
+null,ARGENTINA,null,al foxes promise slyly according to the regular accounts. bold requests alon
+null,BRAZIL,null,y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special 
+null,CANADA,null,eas hang ironic, silent packages. slyly regular packages are furiously over the tithes. fluffily bold
+null,EGYPT,null,y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d
+null,ETHIOPIA,null,ven packages wake quickly. regu
+null,FRANCE,null,refully final requests. regular, ironi
+null,GERMANY,null,l platelets. regular accounts x-ray: unusual, regular acco
+null,INDIA,null,ss excuses cajole slyly across the packages. deposits print aroun
+null,INDONESIA,null, slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull
+null,IRAN,null,efully alongside of the slyly final dependencies. 
+null,IRAQ,null,nic deposits boost atop the quickly final requests? quickly regula
+null,JAPAN,null,ously. final, express gifts cajole a
+null,JORDAN,null,ic deposits are blithely about the carefully regular pa
+null,KENYA,null, pending excuses haggle furiously deposits. pending, express pinto beans wake fluffily past t
+null,MOROCCO,null,rns. blithely bold courts among the closely regular packages use furiously bold platelets?
+null,MOZAMBIQUE,null,s. ironic, unusual asymptotes wake blithely r
+null,PERU,null,platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
+null,CHINA,null,c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos
+null,ROMANIA,null,ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
+null,SAUDI ARABIA,null,ts. silent requests haggle. closely express packages sleep across the blithely
+null,VIETNAM,null,hely enticingly express accounts. even, final 
+null,RUSSIA,null, requests against the platelets use never according to the quickly regular pint
+null,UNITED KINGDOM,null,eans boost carefully special requests. accounts are. carefull
+null,UNITED STATES,null,y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/9b3824b5/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index d1c1a15..e0b4f7e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -1524,7 +1524,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // we use only a sequence of preceding columns of target table's schema
     // as target columns.
     //
-    // For example, consider a target table and an 'insert into' query are give as follows:
+    // For example, consider a target table and an 'insert into' query are given as follows:
     //
     // CREATE TABLE TB1                  (col1 int,  col2 int, col3 long);
     //                                      ||          ||
@@ -1586,11 +1586,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       // Modifying projected columns by adding NULL constants
       // It is because that table appender does not support target columns to be written.
       List<Target> targets = TUtil.newList();
-      for (int i = 0, j = 0; i < tableSchema.size(); i++) {
+      for (int i = 0; i < tableSchema.size(); i++) {
         Column column = tableSchema.getColumn(i);
 
-        if(targetColumns.contains(column) && j < projectionNode.getTargets().length) {
-          targets.add(projectionNode.getTargets()[j++]);
+        int idxInProjectionNode = targetColumns.getIndex(column);
+        if (idxInProjectionNode >= 0 && idxInProjectionNode < projectionNode.getTargets().length) {
+          targets.add(projectionNode.getTargets()[idxInProjectionNode]);
         } else {
           targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName()));
         }


[4/6] tajo git commit: TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.

Posted by ji...@apache.org.
TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.

Closes #559


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/04167bdc
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/04167bdc
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/04167bdc

Branch: refs/heads/index_support
Commit: 04167bdc3bb04b53c5a245a9c18b6426ade82a26
Parents: b6b9d46
Author: Jinho Kim <jh...@apache.org>
Authored: Wed May 6 18:13:40 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed May 6 18:13:40 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 -
 .../org/apache/tajo/master/QueryInProgress.java |  31 ++---
 .../querymaster/QueryMasterManagerService.java  | 135 ++++++++-----------
 .../tajo/worker/ExecutionBlockContext.java      |  32 +++--
 .../java/org/apache/tajo/worker/TajoWorker.java |   1 +
 .../tajo/worker/TajoWorkerManagerService.java   |   2 +
 .../main/java/org/apache/tajo/worker/Task.java  |   4 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  43 +++---
 .../src/main/proto/QueryMasterProtocol.proto    |  14 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |   7 +-
 .../org/apache/tajo/rpc/RpcClientManager.java   |   9 ++
 12 files changed, 133 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index a790655..ebd88cd 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,6 +24,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1584: Remove QueryMaster client sharing in TajoMaster and TajoWorker.
+    (jinho)
+
     TAJO-1563: Improve RPC error handling. (jinho)
 
     TAJO-1311: Enable Scattered Hash Shuffle for CTAS statement. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index bfba290..46e7618 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -224,8 +224,6 @@ public class TajoConf extends Configuration {
     HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true, Validators.bool()),
 
     // RPC --------------------------------------------------------------------
-    RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
-
     //  Internal RPC Client
     INTERNAL_RPC_CLIENT_WORKER_THREAD_NUM("tajo.internal.rpc.client.worker-thread-num",
         Runtime.getRuntime().availableProcessors() * 2),

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index d2286cf..6a074a2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -31,14 +32,13 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
 import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.*;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.util.NetUtils;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -92,7 +92,9 @@ public class QueryInProgress {
     try {
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED);
       if (queryMasterRpcClient != null) {
-        queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+        CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+        queryMasterRpcClient.killQuery(callFuture.getController(), queryId.getProto(), callFuture);
+        callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
       }
     } catch (Throwable e) {
       catchException("Failed to kill query " + queryId + " by exception " + e, e);
@@ -111,9 +113,7 @@ public class QueryInProgress {
 
     masterContext.getResourceManager().releaseQueryMaster(queryId);
 
-    if(queryMasterRpc != null) {
-      RpcClientManager.cleanup(queryMasterRpc);
-    }
+    RpcClientManager.cleanup(queryMasterRpc);
 
     try {
       masterContext.getHistoryWriter().appendAndFlush(queryInfo);
@@ -156,8 +156,9 @@ public class QueryInProgress {
   private void connectQueryMaster() throws Exception {
     InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
-    queryMasterRpc =
-        RpcClientManager.getInstance().getClient(addr, QueryMasterProtocol.class, true);
+
+    RpcClientManager.cleanup(queryMasterRpc);
+    queryMasterRpc = RpcClientManager.getInstance().newClient(addr, QueryMasterProtocol.class, true);
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 
@@ -177,11 +178,7 @@ public class QueryInProgress {
       if(queryMasterRpcClient == null) {
         connectQueryMaster();
       }
-      if(queryMasterRpcClient == null) {
-        LOG.info("No QueryMaster connection info.");
-        //TODO wait
-        return;
-      }
+
       LOG.info("Call executeQuery to :" +
           queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
 
@@ -192,11 +189,15 @@ public class QueryInProgress {
           .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
           .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
 
-      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+      CallFuture<PrimitiveProtos.NullProto> callFuture = new CallFuture<PrimitiveProtos.NullProto>();
+      queryMasterRpcClient.executeQuery(callFuture.getController(), builder.build(), callFuture);
+      callFuture.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
       querySubmitted.set(true);
       getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
     } catch (Exception e) {
       LOG.error("Failed to submit query " + queryId + " to master by exception " + e, e);
+      catchException(e.getMessage(), e);
     } finally {
       writeLock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
index 85cc553..59933a7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java
@@ -115,10 +115,6 @@ public class QueryMasterManagerService extends CompositeService
     return bindAddr;
   }
 
-  public String getHostAndPort() {
-    return bindAddr.getHostName() + ":" + bindAddr.getPort();
-  }
-
   @Override
   public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
                       RpcCallback<TajoWorkerProtocol.TaskRequestProto> done) {
@@ -136,127 +132,106 @@ public class QueryMasterManagerService extends CompositeService
       }
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
+      controller.setFailed(e.getMessage());
     }
   }
 
   @Override
   public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
-      TaskAttemptId attemptId = new TaskAttemptId(request.getId());
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
-      if (queryMasterTask == null) {
-        queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
-      }
-      Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
-      Task task = sq.getTask(attemptId.getTaskId());
-      TaskAttempt attempt = task.getAttempt(attemptId.getId());
+                           RpcCallback<PrimitiveProtos.NullProto> done) {
+    QueryId queryId = new QueryId(request.getId().getTaskId().getExecutionBlockId().getQueryId());
+    TaskAttemptId attemptId = new TaskAttemptId(request.getId());
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+    if (queryMasterTask == null) {
+      queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+    }
+    Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId());
+    Task task = sq.getTask(attemptId.getTaskId());
+    TaskAttempt attempt = task.getAttempt(attemptId.getId());
 
-      if(LOG.isDebugEnabled()){
-        LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
-      }
+    if(LOG.isDebugEnabled()){
+      LOG.debug(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+    }
 
-      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
-        LOG.warn(attemptId + " Killed");
-        attempt.handle(
-            new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
-      } else {
-        queryMasterTask.getEventHandler().handle(
-            new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
+    if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+      LOG.warn(attemptId + " Killed");
+      attempt.handle(
+          new TaskAttemptEvent(new TaskAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+    } else {
+      queryMasterTask.getEventHandler().handle(
+          new TaskAttemptStatusUpdateEvent(new TaskAttemptId(request.getId()), request));
     }
+
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void ping(RpcController controller,
                    TajoIdProtos.ExecutionBlockIdProto requestProto,
-                   RpcCallback<PrimitiveProtos.BoolProto> done) {
-    done.run(TajoWorker.TRUE_PROTO);
+                   RpcCallback<PrimitiveProtos.NullProto> done) {
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
-                         RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
-      if (queryMasterTask != null) {
-        queryMasterTask.handleTaskFailed(report);
-      } else {
-        LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
+                         RpcCallback<PrimitiveProtos.NullProto> done) {
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+        new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+    if (queryMasterTask != null) {
+      queryMasterTask.handleTaskFailed(report);
+    } else {
+      LOG.warn("No QueryMasterTask: " + new TaskAttemptId(report.getId()));
     }
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
-                   RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
-      if (queryMasterTask != null) {
-        queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
-      }
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
+                   RpcCallback<PrimitiveProtos.NullProto> done) {
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+        new QueryId(report.getId().getTaskId().getExecutionBlockId().getQueryId()));
+    if (queryMasterTask != null) {
+      queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
     }
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void doneExecutionBlock(
       RpcController controller, TajoWorkerProtocol.ExecutionBlockReport request,
-      RpcCallback<PrimitiveProtos.BoolProto> done) {
+      RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId()));
     if (queryMasterTask != null) {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId());
       queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request));
     }
-    done.run(TajoWorker.TRUE_PROTO);
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
-                        RpcCallback<PrimitiveProtos.BoolProto> done) {
+                        RpcCallback<PrimitiveProtos.NullProto> done) {
     QueryId queryId = new QueryId(request);
     QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
     if (queryMasterTask != null) {
-      Query query = queryMasterTask.getQuery();
-      if (query != null) {
-        query.handle(new QueryEvent(queryId, QueryEventType.KILL));
-      }
+      queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
     }
+    done.run(TajoWorker.NULL_PROTO);
   }
 
   @Override
   public void executeQuery(RpcController controller,
                            TajoWorkerProtocol.QueryExecutionRequestProto request,
-                           RpcCallback<PrimitiveProtos.BoolProto> done) {
-    try {
-      workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
-
-      QueryId queryId = new QueryId(request.getQueryId());
-      LOG.info("Receive executeQuery request:" + queryId);
-      queryMaster.handle(new QueryStartEvent(queryId,
-          new Session(request.getSession()),
-          new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
-              request.getQueryContext()), request.getExprInJson().getValue(),
-          request.getLogicalPlanJson().getValue()));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      workerContext.getWorkerSystemMetrics().counter("querymaster", "errorQuery").inc();
-      LOG.error(e.getMessage(), e);
-      done.run(TajoWorker.FALSE_PROTO);
-    }
+                           RpcCallback<PrimitiveProtos.NullProto> done) {
+    workerContext.getWorkerSystemMetrics().counter("querymaster", "numQuery").inc();
+
+    QueryId queryId = new QueryId(request.getQueryId());
+    LOG.info("Receive executeQuery request:" + queryId);
+    queryMaster.handle(new QueryStartEvent(queryId,
+        new Session(request.getSession()),
+        new QueryContext(workerContext.getQueryMaster().getContext().getConf(),
+            request.getQueryContext()), request.getExprInJson().getValue(),
+        request.getLogicalPlanJson().getValue()));
+    done.run(TajoWorker.NULL_PROTO);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 0d26e6c..cd4b6a6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import static org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
 
 public class ExecutionBlockContext {
   /** class logger */
@@ -78,6 +79,8 @@ public class ExecutionBlockContext {
   private TajoQueryEngine queryEngine;
   private RpcClientManager connManager;
   private InetSocketAddress qmMasterAddr;
+  private NettyClientBase client;
+  private QueryMasterProtocol.QueryMasterProtocolService.Interface stub;
   private WorkerConnectionInfo queryMaster;
   private TajoConf systemConf;
   // for the doAs block
@@ -132,16 +135,14 @@ public class ExecutionBlockContext {
 
     // initialize DFS and LocalFileSystems
     this.taskOwner = taskOwner;
+    this.stub = getRpcClient().getStub();
     this.reporter.startReporter();
-
     // resource intiailization
     try{
       this.resource.initialize(queryContext, plan);
     } catch (Throwable e) {
       try {
-        NettyClientBase client = getQueryMasterConnection();
-        QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
-        stub.killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
+        getStub().killQuery(null, executionBlockId.getQueryId().getProto(), NullCallback.get());
       } catch (Throwable t) {
         //ignore
       }
@@ -153,9 +154,20 @@ public class ExecutionBlockContext {
     return resource;
   }
 
-  public NettyClientBase getQueryMasterConnection()
+  private NettyClientBase getRpcClient()
       throws NoSuchMethodException, ConnectException, ClassNotFoundException {
-    return connManager.getClient(qmMasterAddr, QueryMasterProtocol.class, true);
+    if (client != null) return client;
+
+    client = connManager.newClient(qmMasterAddr, QueryMasterProtocol.class, true);
+    return client;
+  }
+
+  public Interface getStub() {
+    return stub;
+  }
+
+  public boolean isStopped() {
+    return stop.get();
   }
 
   public void stop(){
@@ -184,6 +196,7 @@ public class ExecutionBlockContext {
     tasks.clear();
 
     resource.release();
+    RpcClientManager.cleanup(client);
   }
 
   public TajoConf getConf() {
@@ -282,8 +295,7 @@ public class ExecutionBlockContext {
     /* This case is that worker did not ran tasks */
     if(completedTasksNum.get() == 0) return;
 
-    NettyClientBase client = getQueryMasterConnection();
-    QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub();
+    Interface stub = getStub();
 
     ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder();
     reporterBuilder.setEbId(ebId.getProto());
@@ -379,10 +391,8 @@ public class ExecutionBlockContext {
         public void run() {
           while (!reporterStop.get() && !Thread.interrupted()) {
 
-            NettyClientBase client = null;
             try {
-              client = getQueryMasterConnection();
-              QueryMasterProtocol.QueryMasterProtocolService.Interface masterStub = client.getStub();
+              Interface masterStub = getStub();
 
               if(tasks.size() == 0){
                 masterStub.ping(null, getExecutionBlockId().getProto(), NullCallback.get());

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 79b83e4..b666f80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -77,6 +77,7 @@ import static org.apache.tajo.conf.TajoConf.ConfVars;
 public class TajoWorker extends CompositeService {
   public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
   public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+  public static final PrimitiveProtos.NullProto NULL_PROTO = PrimitiveProtos.NullProto.newBuilder().build();
 
   private static final Log LOG = LogFactory.getLog(TajoWorker.class);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 71d96c4..bbf8564 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -127,6 +127,7 @@ public class TajoWorkerManagerService extends CompositeService
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
+      controller.setFailed(t.getMessage());
       done.run(TajoWorker.FALSE_PROTO);
     }
   }
@@ -142,6 +143,7 @@ public class TajoWorkerManagerService extends CompositeService
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
+      controller.setFailed(e.getMessage());
       done.run(TajoWorker.FALSE_PROTO);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index a983f78..53ed73e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -440,9 +440,7 @@ public class Task {
       executionBlockContext.completedTasksNum.incrementAndGet();
       context.getHashShuffleAppenderManager().finalizeTask(taskId);
 
-      NettyClientBase client = executionBlockContext.getQueryMasterConnection();
-
-      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = client.getStub();
+      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
       if (context.isStopped()) {
         context.setExecutorProgress(0.0f);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 6076913..31f25f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -194,24 +194,8 @@ public class TaskRunner extends AbstractService {
           CallFuture<TaskRequestProto> callFuture = null;
           TaskRequestProto taskRequest = null;
 
-          while(!stopped) {
-            NettyClientBase client;
-            try {
-              client = executionBlockContext.getQueryMasterConnection();
-            } catch (ConnectException ce) {
-              // NettyClientBase throws ConnectTimeoutException if connection was failed
-              stop();
-              getContext().stopTaskRunner(getId());
-              LOG.error("Connecting to QueryMaster was failed.", ce);
-              break;
-            } catch (Throwable t) {
-              LOG.fatal("Unable to handle exception: " + t.getMessage(), t);
-              stop();
-              getContext().stopTaskRunner(getId());
-              break;
-            }
-
-            QueryMasterProtocolService.Interface qmClientService = client.getStub();
+          while(!stopped && !executionBlockContext.isStopped()) {
+            QueryMasterProtocolService.Interface qmClientService = executionBlockContext.getStub();
 
             try {
               if (callFuture == null) {
@@ -243,8 +227,12 @@ public class TaskRunner extends AbstractService {
                 }
                 continue;
               } catch (ExecutionException ee) {
-                LOG.error(ee.getMessage(), ee);
-                break;
+                if(!getContext().isStopped()){
+                  LOG.error(ee.getMessage(), ee);
+                } else {
+                  /* EB is stopped */
+                  break;
+                }
               }
 
               if (taskRequest != null) {
@@ -253,9 +241,6 @@ public class TaskRunner extends AbstractService {
                 // immediately.
                 if (taskRequest.getShouldDie()) {
                   LOG.info("Received ShouldDie flag:" + getId());
-                  stop();
-                  //notify to TaskRunnerManager
-                  getContext().stopTaskRunner(getId());
                 } else {
                   getContext().getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
                   LOG.info("Accumulated Received Task: " + (++receivedNum));
@@ -268,7 +253,7 @@ public class TaskRunner extends AbstractService {
                   }
 
                   LOG.info("Initializing: " + taskAttemptId);
-                  Task task;
+                  Task task = null;
                   try {
                     task = new Task(getId(), getTaskBaseDir(), taskAttemptId, executionBlockContext,
                         new TaskRequestImpl(taskRequest));
@@ -283,20 +268,22 @@ public class TaskRunner extends AbstractService {
                   } catch (Throwable t) {
                     LOG.error(t.getMessage(), t);
                     fatalError(qmClientService, taskAttemptId, t.getMessage());
+                    if(task != null) {
+                      task.cleanupTask();
+                    }
                   } finally {
                     callFuture = null;
                     taskRequest = null;
                   }
                 }
-              } else {
-                stop();
-                //notify to TaskRunnerManager
-                getContext().stopTaskRunner(getId());
               }
             } catch (Throwable t) {
               LOG.fatal(t.getMessage(), t);
             }
           }
+          stop();
+          //notify to TaskRunnerManager
+          getContext().stopTaskRunner(getId());
         }
       });
       taskLauncher.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index ae20309..855c2c6 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -34,13 +34,13 @@ package hadoop.yarn;
 service QueryMasterProtocolService {
   //from Worker
   rpc getTask(GetTaskRequestProto) returns (TaskRequestProto);
-  rpc statusUpdate (TaskStatusProto) returns (BoolProto);
-  rpc ping (ExecutionBlockIdProto) returns (BoolProto);
-  rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
-  rpc done (TaskCompletionReport) returns (BoolProto);
-  rpc doneExecutionBlock(ExecutionBlockReport) returns (BoolProto);
+  rpc statusUpdate (TaskStatusProto) returns (NullProto);
+  rpc ping (ExecutionBlockIdProto) returns (NullProto);
+  rpc fatalError(TaskFatalErrorReport) returns (NullProto);
+  rpc done (TaskCompletionReport) returns (NullProto);
+  rpc doneExecutionBlock(ExecutionBlockReport) returns (NullProto);
 
   //from TajoMaster's QueryJobManager
-  rpc killQuery(QueryIdProto) returns (BoolProto);
-  rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+  rpc killQuery(QueryIdProto) returns (NullProto);
+  rpc executeQuery(QueryExecutionRequestProto) returns (NullProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 8f6f9ed..0d86527 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -195,7 +195,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
       if (maxRetries > retries) {
         retries++;
 
-        LOG.warn(future.cause() + " Try to reconnect : " + getKey().addr);
+        LOG.warn(getErrorMessage(ExceptionUtils.getMessage(future.cause())) + " Try to reconnect : " + getKey().addr);
         try {
           Thread.sleep(RpcConstants.DEFAULT_PAUSE);
         } catch (InterruptedException e) {
@@ -246,8 +246,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
 
   private String getErrorMessage(String message) {
     return "Exception [" + getKey().protocolClass.getCanonicalName() +
-        "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)
-        getChannel().remoteAddress()) + ")]: " + message;
+        "(" + getKey().addr + ")]: " + message;
   }
 
   @Override
@@ -332,7 +331,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable
         throws Exception {
 
       Throwable rootCause = ExceptionUtils.getRootCause(cause);
-      LOG.error(getKey().addr + "," + getKey().protocolClass + "," + ExceptionUtils.getMessage(rootCause), rootCause);
+      LOG.error(getErrorMessage(ExceptionUtils.getMessage(rootCause)), rootCause);
 
       if (cause instanceof RecoverableException) {
         sendException((RecoverableException) cause);

http://git-wip-us.apache.org/repos/asf/tajo/blob/04167bdc/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
index f8def7f..111754e 100644
--- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
+++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java
@@ -130,6 +130,15 @@ public class RpcClientManager {
     return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing);
   }
 
+  public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr,
+                                                              Class<?> protocolClass,
+                                                              boolean asyncMode)
+      throws NoSuchMethodException, ClassNotFoundException, ConnectException {
+
+    return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode),
+        retries, getTimeoutSeconds(), TimeUnit.SECONDS, true);
+  }
+
   public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key,
                                                               int retries,
                                                               long timeout,


[2/6] tajo git commit: TAJO-1583: Remove ServerCallable in RPC client. (jinho)

Posted by ji...@apache.org.
TAJO-1583: Remove ServerCallable in RPC client. (jinho)

Closes #556


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47554105
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47554105
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47554105

Branch: refs/heads/index_support
Commit: 475541057891518e08e5a18ebbbf916c1ad60c10
Parents: 9540f16
Author: Jinho Kim <jh...@apache.org>
Authored: Thu Apr 30 16:51:56 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Thu Apr 30 16:51:56 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/catalog/AbstractCatalogClient.java     | 569 ++++++-------------
 .../org/apache/tajo/catalog/CatalogClient.java  |  49 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   8 +-
 .../tajo/catalog/LocalCatalogWrapper.java       |  20 +-
 .../tajo/client/CatalogAdminClientImpl.java     | 236 +++-----
 .../org/apache/tajo/client/QueryClientImpl.java | 328 +++++------
 .../apache/tajo/client/SessionConnection.java   | 275 ++++-----
 .../cli/tsql/TestDefaultCliOutputFormatter.java |   4 -
 .../apache/tajo/querymaster/TestKillQuery.java  |  63 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |   3 +-
 .../tajo/rpc/RetriesExhaustedException.java     | 104 ----
 .../org/apache/tajo/rpc/ServerCallable.java     | 148 -----
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  39 --
 14 files changed, 618 insertions(+), 1230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8bda2bd..952f852 100644
--- a/CHANGES
+++ b/CHANGES
@@ -214,6 +214,8 @@ Release 0.11.0 - unreleased
   
   TASKS
 
+    TAJO-1583: Remove ServerCallable in RPC client. (jinho)
+
     TAJO-1587: Upgrade java version to 1.7 for Travis CI. (jihoon)
 
     TAJO-1559: Fix data model description (tinyint, smallint).

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 49be29a..766f6c2 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,16 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.exception.InvalidOperationException;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.ProtoUtil;
 
-import java.net.InetSocketAddress;
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -46,50 +42,27 @@ import java.util.List;
 /**
  * CatalogClient provides a client API to access the catalog server.
  */
-public abstract class AbstractCatalogClient implements CatalogService {
-  private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
+public abstract class AbstractCatalogClient implements CatalogService, Closeable {
+  protected final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
 
-  protected ServiceTracker serviceTracker;
-  protected RpcClientManager manager;
-  protected InetSocketAddress catalogServerAddr;
   protected TajoConf conf;
 
-  abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
-
-  public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
-    this.manager = RpcClientManager.getInstance();
-    this.catalogServerAddr = catalogServerAddr;
-    this.serviceTracker = ServiceTrackerFactory.get(conf);
+  public AbstractCatalogClient(TajoConf conf) {
     this.conf = conf;
   }
 
-  private InetSocketAddress getCatalogServerAddr() {
-    if (catalogServerAddr == null) {
-      return null;
-    } else {
-
-      if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-        return catalogServerAddr;
-      } else {
-        return serviceTracker.getCatalogAddress();
-      }
-    }
-  }
+  abstract CatalogProtocolService.BlockingInterface getStub() throws ServiceException;
 
   @Override
   public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
+      CatalogProtocolService.BlockingInterface stub = getStub();
 
-          CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
-          builder.setTablespaceName(tablespaceName);
-          builder.setTablespaceUri(tablespaceUri);
-          return stub.createTablespace(null, builder.build()).getValue();
-        }
-      }.withRetries();
-    } catch (ServiceException e) {
+      CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
+      builder.setTablespaceName(tablespaceName);
+      builder.setTablespaceUri(tablespaceUri);
+      return stub.createTablespace(null, builder.build()).getValue();
+    } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
     }
@@ -98,12 +71,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -113,12 +82,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -128,46 +93,32 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllTablespaceNames() {
     try {
-      return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<String> call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
-          return ProtoUtil.convertStrings(response);
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
+      return ProtoUtil.convertStrings(response);
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<String>();
     }
   }
   
   @Override
   public List<TablespaceProto> getAllTablespaces() {
     try {
-      return new ServerCallable<List<TablespaceProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TablespaceProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
-          return response.getTablespaceList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
+      return response.getTablespaceList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TablespaceProto>();
     }
   }
 
   @Override
   public TablespaceProto getTablespace(final String tablespaceName) {
     try {
-      return new ServerCallable<TablespaceProto>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public TablespaceProto call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -177,12 +128,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.alterTablespace(null, alterTablespace).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.alterTablespace(null, alterTablespace).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -192,18 +139,14 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
+      CatalogProtocolService.BlockingInterface stub = getStub();
 
-          CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
-          builder.setDatabaseName(databaseName);
-          if (tablespaceName != null) {
-            builder.setTablespaceName(tablespaceName);
-          }
-          return stub.createDatabase(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
+      builder.setDatabaseName(databaseName);
+      if (tablespaceName != null) {
+        builder.setTablespaceName(tablespaceName);
+      }
+      return stub.createDatabase(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -213,12 +156,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean dropDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -228,12 +167,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Boolean existDatabase(final String databaseName) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return Boolean.FALSE;
@@ -243,50 +178,36 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final Collection<String> getAllDatabaseNames() {
     try {
-      return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<String> call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
-          return ProtoUtil.convertStrings(response);
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
+      return ProtoUtil.convertStrings(response);
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<String>();
     }
   }
   
   @Override
   public List<DatabaseProto> getAllDatabases() {
     try {
-      return new ServerCallable<List<DatabaseProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<DatabaseProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
-          return response.getDatabaseList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
+      return response.getDatabaseList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<DatabaseProto>();
     }
   }
 
   @Override
   public final TableDesc getTableDesc(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<TableDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public TableDesc call(NettyClientBase client) throws ServiceException {
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -302,89 +223,60 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<TableDescriptorProto> getAllTables() {
     try {
-      return new ServerCallable<List<TableDescriptorProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TableDescriptorProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
-          return response.getTableList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
+      return response.getTableList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TableDescriptorProto>();
     }
   }
   
   @Override
   public List<TableOptionProto> getAllTableOptions() {
     try {
-      return new ServerCallable<List<TableOptionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TableOptionProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
-          return response.getTableOptionList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
+      return response.getTableOptionList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TableOptionProto>();
     }
   }
   
   @Override
   public List<TableStatsProto> getAllTableStats() {
     try {
-      return new ServerCallable<List<TableStatsProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TableStatsProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
-          return response.getStatList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
+      return response.getStatList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TableStatsProto>();
     }
   }
   
   @Override
   public List<ColumnProto> getAllColumns() {
     try {
-      return new ServerCallable<List<ColumnProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<ColumnProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
-          return response.getColumnList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
+      return response.getColumnList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<ColumnProto>();
     }
   }
 
   @Override
   public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null,  builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -394,17 +286,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existPartitionMethod(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existPartitionMethod(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existPartitionMethod(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -415,18 +302,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public final PartitionDescProto getPartition(final String databaseName, final String tableName,
                                                final String partitionName) {
     try {
-      return new ServerCallable<PartitionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public PartitionDescProto call(NettyClientBase client) throws ServiceException {
-
-          PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
-          builder.setPartitionName(partitionName);
+      PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
+      builder.setPartitionName(partitionName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.getPartitionByPartitionName(null, builder.build());
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.getPartitionByPartitionName(null, builder.build());
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -436,94 +318,70 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) {
     try {
-      return new ServerCallable<List<PartitionDescProto>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class,
-        false) {
-        public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException {
+      PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
-
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
-          return response.getPartitionList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
+      return response.getPartitionList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<PartitionDescProto>();
     }
   }
   @Override
   public List<TablePartitionProto> getAllPartitions() {
     try {
-      return new ServerCallable<List<TablePartitionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<TablePartitionProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
-          return response.getPartList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
+      return response.getPartList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<TablePartitionProto>();
     }
   }
 
   @Override
   public final Collection<String> getAllTableNames(final String databaseName) {
     try {
-      return new ServerCallable<Collection<String>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<String> call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
-          return ProtoUtil.convertStrings(response);
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
+      return ProtoUtil.convertStrings(response);
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<String>();
     }
   }
 
   @Override
   public final Collection<FunctionDesc> getFunctions() {
-    try {
-      return new ServerCallable<Collection<FunctionDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
-          List<FunctionDesc> list = new ArrayList<FunctionDesc>();
-          GetFunctionsResponse response;
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          response = stub.getFunctions(null, NullProto.newBuilder().build());
-          int size = response.getFunctionDescCount();
-          for (int i = 0; i < size; i++) {
-            try {
-              list.add(new FunctionDesc(response.getFunctionDesc(i)));
-            } catch (ClassNotFoundException e) {
-              LOG.error(e, e);
-              return null;
-            }
-          }
+    List<FunctionDesc> list = new ArrayList<FunctionDesc>();
+    try {
+      GetFunctionsResponse response;
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      response = stub.getFunctions(null, NullProto.newBuilder().build());
+      int size = response.getFunctionDescCount();
+      for (int i = 0; i < size; i++) {
+        try {
+          list.add(new FunctionDesc(response.getFunctionDesc(i)));
+        } catch (ClassNotFoundException e) {
+          LOG.error(e, e);
           return list;
         }
-      }.withRetries();
+      }
+      return list;
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return list;
     }
   }
 
   @Override
   public final boolean createTable(final TableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.createTable(null, desc.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.createTable(null, desc.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -537,17 +395,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
     final String simpleName = splitted[1];
 
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(simpleName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(simpleName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropTable(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropTable(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -561,17 +414,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
           "tableName cannot be composed of multiple parts, but it is \"" + tableName + "\"");
     }
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-
-          TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setTableName(tableName);
+      TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setTableName(tableName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existsTable(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existsTable(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -586,12 +434,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean createIndex(final IndexDesc index) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.createIndex(null, index.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.createIndex(null, index.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -601,16 +445,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean existIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          IndexNameProto.Builder builder = IndexNameProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setIndexName(indexName);
+      IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setIndexName(indexName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existIndexByName(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existIndexByName(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -620,17 +460,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
 
-          GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
-          builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-          builder.setColumnName(columnName);
+      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+      builder.setColumnName(columnName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.existIndexByColumn(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.existIndexByColumn(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -640,17 +476,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final IndexDesc getIndexByName(final String databaseName, final String indexName) {
     try {
-      return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public IndexDesc call(NettyClientBase client) throws ServiceException {
-
-          IndexNameProto.Builder builder = IndexNameProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setIndexName(indexName);
+      IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setIndexName(indexName);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return new IndexDesc(stub.getIndexByName(null, builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return new IndexDesc(stub.getIndexByName(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -662,17 +493,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
                                           final String tableName,
                                           final String columnName) {
     try {
-      return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public IndexDesc call(NettyClientBase client) throws ServiceException {
+      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+      builder.setColumnName(columnName);
 
-          GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
-          builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-          builder.setColumnName(columnName);
-
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return null;
@@ -683,17 +509,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
   public boolean dropIndex(final String databaseName,
                            final String indexName) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
+      IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+      builder.setDatabaseName(databaseName);
+      builder.setIndexName(indexName);
 
-          IndexNameProto.Builder builder = IndexNameProto.newBuilder();
-          builder.setDatabaseName(databaseName);
-          builder.setIndexName(indexName);
-
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropIndex(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropIndex(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -703,30 +524,20 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public List<IndexProto> getAllIndexes() {
     try {
-      return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
-        @Override
-        public List<IndexProto> call(NettyClientBase client) throws Exception {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
-          return response.getIndexList();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
+      return response.getIndexList();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return null;
+      return new ArrayList<IndexProto>();
     }
   }
 
   @Override
   public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.createFunction(null, funcDesc.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.createFunction(null, funcDesc.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -736,15 +547,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final boolean dropFunction(final String signature) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
-          builder.setSignature(signature);
+      UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
+      builder.setSignature(signature);
 
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.dropFunction(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.dropFunction(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -769,24 +576,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
 
     FunctionDescProto descProto = null;
     try {
-      descProto = new ServerCallable<FunctionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public FunctionDescProto call(NettyClientBase client) throws ServiceException {
-          try {
-            CatalogProtocolService.BlockingInterface stub = getStub(client);
-            return stub.getFunctionMeta(null, builder.build());
-          } catch (NoSuchFunctionException e) {
-            abort();
-            throw e;
-          }
-        }
-      }.withRetries();
-    } catch(ServiceException e) {
-      // this is not good. we need to define user massage exception
-      if(e.getCause() instanceof NoSuchFunctionException){
-        LOG.debug(e.getMessage());
-      } else {
-        LOG.error(e.getMessage(), e);
-      }
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      descProto = stub.getFunctionMeta(null, builder.build());
+    } catch (NoSuchFunctionException e) {
+      LOG.debug(e.getMessage());
+    } catch (ServiceException e) {
+      LOG.error(e.getMessage(), e);
     }
 
     if (descProto == null) {
@@ -819,27 +614,21 @@ public abstract class AbstractCatalogClient implements CatalogService {
     }
 
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.containFunction(null, builder.build()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.containFunction(null, builder.build()).getValue();
+    } catch (InvalidOperationException e) {
+      LOG.error(e.getMessage());
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
-      return false;
     }
+    return false;
   }
 
   @Override
   public final boolean alterTable(final AlterTableDesc desc) {
     try {
-      return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.alterTable(null, desc.getProto()).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.alterTable(null, desc.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;
@@ -849,12 +638,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) {
     try {
-      return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-        public Boolean call(NettyClientBase client) throws ServiceException {
-          CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.updateTableStats(null, updateTableStatsProto).getValue();
-        }
-      }.withRetries();
+      CatalogProtocolService.BlockingInterface stub = getStub();
+      return stub.updateTableStats(null, updateTableStatsProto).getValue();
     } catch (ServiceException e) {
       LOG.error(e.getMessage(), e);
       return false;

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 7666a97..80ded4a 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -18,35 +18,72 @@
 
 package org.apache.tajo.catalog;
 
+import com.google.protobuf.ServiceException;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.NetUtils;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 
 /**
  * CatalogClient provides a client API to access the catalog server.
  */
 public class CatalogClient extends AbstractCatalogClient {
+  protected NettyClientBase client;
+  protected ServiceTracker serviceTracker;
+  protected InetSocketAddress catalogServerAddr;
   /**
    * @throws java.io.IOException
    *
    */
   public CatalogClient(final TajoConf conf) throws IOException {
-    super(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS)));
+    super(conf);
+    this.catalogServerAddr = NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS));
+    this.serviceTracker = ServiceTrackerFactory.get(conf);
   }
 
-  public CatalogClient(TajoConf conf, String host, int port) throws IOException {
-    super(conf, NetUtils.createSocketAddr(host, port));
-  }
 
   @Override
-  BlockingInterface getStub(NettyClientBase client) {
-    return client.getStub();
+  BlockingInterface getStub() throws ServiceException {
+    return getCatalogConnection().getStub();
+  }
+
+  private InetSocketAddress getCatalogServerAddr() {
+    if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+      return catalogServerAddr;
+    } else {
+      return serviceTracker.getCatalogAddress();
+    }
   }
 
+  public synchronized NettyClientBase getCatalogConnection() throws ServiceException {
+    if (client != null && client.isConnected()) return client;
+    else {
+      try {
+        if (client != null && client.isConnected()) return client;
+        RpcClientManager.cleanup(client);
+
+        int retry = conf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES);
+        // Client do not closed on idle state for support high available
+        this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, false,
+            retry, 0, TimeUnit.SECONDS, false);
+      } catch (Exception e) {
+        throw new ServiceException(e);
+      }
+      return client;
+    }
+  }
+
+  @Override
   public void close() {
+    RpcClientManager.cleanup(client);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index e9fb177..f2e9795 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -33,7 +33,6 @@ import org.apache.tajo.annotation.ThreadSafe;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
 import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
 import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.store.CatalogStore;
 import org.apache.tajo.catalog.store.DerbyStore;
@@ -61,7 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
 import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
 
 /**
  * This class provides the catalog service. The catalog service enables clients
@@ -1192,7 +1190,7 @@ public class CatalogServer extends AbstractService {
       if (functions.containsKey(funcDesc.getSignature())) {
         FunctionDescProto found = findFunctionStrictType(funcDesc, true);
         if (found != null) {
-          throw new AlreadyExistsFunctionException(signature.toString());
+          throw new ServiceException(new AlreadyExistsFunctionException(signature.toString()));
         }
       }
 
@@ -1209,7 +1207,7 @@ public class CatalogServer extends AbstractService {
         throws ServiceException {
 
       if (!containFunction(request.getSignature())) {
-        throw new NoSuchFunctionException(request.getSignature(), new DataType[] {});
+        throw new ServiceException(new NoSuchFunctionException(request.getSignature(), new DataType[]{}));
       }
 
       functions.remove(request.getSignature());
@@ -1231,7 +1229,7 @@ public class CatalogServer extends AbstractService {
       }
 
       if (function == null) {
-        throw new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList());
+        throw new ServiceException(new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList()));
       } else {
         return function;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
index df9bd2c..35e9e2e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
@@ -22,9 +22,6 @@
 package org.apache.tajo.catalog;
 
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-
-import java.io.IOException;
 
 /**
  * This class provides a catalog service interface in
@@ -34,20 +31,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
   private CatalogServer catalog;
   private CatalogProtocol.CatalogProtocolService.BlockingInterface stub;
 
-  public LocalCatalogWrapper(final TajoConf conf) throws IOException {
-    super(conf, null);
-    this.catalog = new CatalogServer();
-    this.catalog.init(conf);
-    this.catalog.start();
-    this.stub = catalog.getHandler();
-  }
-
   public LocalCatalogWrapper(final CatalogServer server) {
     this(server, server.getConf());
   }
 
   public LocalCatalogWrapper(final CatalogServer server, final TajoConf conf) {
-    super(conf, null);
+    super(conf);
     this.catalog = server;
     this.stub = server.getHandler();
   }
@@ -57,7 +46,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
   }
 
   @Override
-  CatalogProtocol.CatalogProtocolService.BlockingInterface getStub(NettyClientBase client) {
+  CatalogProtocol.CatalogProtocolService.BlockingInterface getStub() {
     return stub;
   }
+
+  @Override
+  public void close() {
+    //nothing to do
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 9d0e427..9397fcf 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@ -27,10 +27,8 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.jdbc.SQLStates;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
 
 import java.io.IOException;
 import java.net.URI;
@@ -48,79 +46,45 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
 
   @Override
   public boolean createDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMaster = client.getStub();
-        return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMaster = client.getStub();
+    return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
   }
 
   @Override
   public boolean existDatabase(final String databaseName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMaster = client.getStub();
-        return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMaster = client.getStub();
+    return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
   }
 
   @Override
   public boolean dropDatabase(final String databaseName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
   }
 
   @Override
   public List<String> getAllDatabaseNames() throws ServiceException {
 
-    return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public List<String> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
   }
 
   public boolean existTable(final String tableName) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+    return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
   }
 
   @Override
@@ -133,32 +97,25 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
                                        final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
       throws SQLException, ServiceException {
 
-    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setName(tableName);
-        builder.setSchema(schema.getProto());
-        builder.setMeta(meta.getProto());
-        builder.setPath(path.toString());
-        if (partitionMethodDesc != null) {
-          builder.setPartition(partitionMethodDesc.getProto());
-        }
-        ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
-          return CatalogUtil.newTableDesc(res.getTableDesc());
-        } else {
-          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-        }
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setName(tableName);
+    builder.setSchema(schema.getProto());
+    builder.setMeta(meta.getProto());
+    builder.setPath(path.toString());
+    if (partitionMethodDesc != null) {
+      builder.setPartition(partitionMethodDesc.getProto());
+    }
+    ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+      return CatalogUtil.newTableDesc(res.getTableDesc());
+    } else {
+      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+    }
   }
 
   @Override
@@ -169,94 +126,67 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
   @Override
   public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
 
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setName(tableName);
-        builder.setPurge(purge);
-        return tajoMasterService.dropTable(null, builder.build()).getValue();
-      }
-
-    }.withRetries();
+    ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setName(tableName);
+    builder.setPurge(purge);
+    return tajoMasterService.dropTable(null, builder.build()).getValue();
 
   }
 
   @Override
   public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
-    return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public List<String> call(NettyClientBase client) throws ServiceException {
 
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        if (databaseName != null) {
-          builder.setDatabaseName(databaseName);
-        }
-        ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
-        return res.getTablesList();
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    if (databaseName != null) {
+      builder.setDatabaseName(databaseName);
+    }
+    ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
+    return res.getTablesList();
   }
 
   @Override
   public TableDesc getTableDesc(final String tableName) throws ServiceException {
 
-    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setTableName(tableName);
-        ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
-          return CatalogUtil.newTableDesc(res.getTableDesc());
-        } else {
-          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-        }
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setTableName(tableName);
+    ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
+    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+      return CatalogUtil.newTableDesc(res.getTableDesc());
+    } else {
+      throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
+    }
   }
 
   @Override
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
 
-    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
-        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
-      public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
-
-        connection.checkSessionAndGet(client);
-        BlockingInterface tajoMasterService = client.getStub();
-
-        String paramFunctionName = functionName == null ? "" : functionName;
-        ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
-            connection.convertSessionedString(paramFunctionName));
-        if (res.getResultCode() == ClientProtos.ResultCode.OK) {
-          return res.getFunctionsList();
-        } else {
-          throw new SQLException(res.getErrorMessage());
-        }
-      }
-
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    BlockingInterface tajoMasterService = client.getStub();
+
+    String paramFunctionName = functionName == null ? "" : functionName;
+    ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+        connection.convertSessionedString(paramFunctionName));
+    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+      return res.getFunctionsList();
+    } else {
+      throw new ServiceException(new SQLException(res.getErrorMessage()));
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 99c58b6..53889fe 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -28,11 +28,10 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.io.IOException;
@@ -40,6 +39,7 @@ import java.net.InetSocketAddress;
 import java.sql.ResultSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.tajo.ipc.ClientProtos.*;
 import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
@@ -102,7 +102,7 @@ public class QueryClientImpl implements QueryClient {
   public void closeNonForwardQuery(QueryId queryId) {
     NettyClientBase tmClient = null;
     try {
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub();
       connection.checkSessionAndGet(tmClient);
 
@@ -153,50 +153,37 @@ public class QueryClientImpl implements QueryClient {
 
   @Override
   public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
 
-    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    final QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(sql);
+    builder.setIsJson(false);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
 
-        connection.checkSessionAndGet(client);
-
-        final QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(sql);
-        builder.setIsJson(false);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
-        SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
-        if (response.getResultCode() == ResultCode.OK) {
-          connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-        }
-        return response;
-      }
-    }.withRetries();
+    SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
+    if (response.getResultCode() == ResultCode.OK) {
+      connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+    }
+    return response;
   }
 
   @Override
   public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
 
-    return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
 
-        final QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(json);
-        builder.setIsJson(true);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    final QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(json);
+    builder.setIsJson(true);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
 
-        return tajoMasterService.submitQuery(null, builder.build());
-      }
-    }.withRetries();
+    return tajoMasterService.submitQuery(null, builder.build());
   }
 
   @Override
@@ -308,7 +295,7 @@ public class QueryClientImpl implements QueryClient {
 
     NettyClientBase tmClient = null;
     try {
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       connection.checkSessionAndGet(tmClient);
       builder.setSessionId(connection.sessionId);
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
@@ -348,7 +335,7 @@ public class QueryClientImpl implements QueryClient {
 
     try {
 
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       connection.checkSessionAndGet(tmClient);
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
 
@@ -369,42 +356,26 @@ public class QueryClientImpl implements QueryClient {
       throws ServiceException {
 
     try {
-      final ServerCallable<ClientProtos.SerializedResultSet> callable =
-          new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
-              TajoMasterClientProtocol.class, false) {
-
-            public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
-
-              connection.checkSessionAndGet(client);
-              TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-              GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
-              builder.setSessionId(connection.sessionId);
-              builder.setQueryId(queryId.getProto());
-              builder.setFetchRowNum(fetchRowNum);
-              try {
-                GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
-                if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
-                  abort();
-                  throw new ServiceException(response.getErrorMessage());
-                }
-
-                return response.getResultSet();
-              } catch (ServiceException e) {
-                abort();
-                throw e;
-              } catch (Throwable t) {
-                throw new ServiceException(t.getMessage(), t);
-              }
-            }
-          };
-
-      ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
+      NettyClientBase client = connection.getTajoMasterConnection();
+      connection.checkSessionAndGet(client);
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+      GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+      builder.setSessionId(connection.sessionId);
+      builder.setQueryId(queryId.getProto());
+      builder.setFetchRowNum(fetchRowNum);
+
+      GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
+      if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+        throw new ServiceException(response.getErrorMessage());
+      }
+
+      ClientProtos.SerializedResultSet resultSet = response.getResultSet();
 
       return new TajoMemoryResultSet(queryId,
-          new Schema(serializedResultSet.getSchema()),
-          serializedResultSet.getSerializedTuplesList(),
-          serializedResultSet.getSerializedTuplesCount(),
+          new Schema(resultSet.getSchema()),
+          resultSet.getSerializedTuplesList(),
+          resultSet.getSerializedTuplesCount(),
           getClientSideSessionVars());
     } catch (ServiceException e) {
       throw e;
@@ -416,119 +387,86 @@ public class QueryClientImpl implements QueryClient {
   @Override
   public boolean updateQuery(final String sql) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public Boolean call(NettyClientBase client) throws ServiceException {
+    QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(sql);
+    builder.setIsJson(false);
+    ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
 
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(sql);
-        builder.setIsJson(false);
-        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-
-        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
-          connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-          return true;
-        } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
-          }
-          return false;
-        }
+    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+      connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+      return true;
+    } else {
+      if (response.hasErrorMessage()) {
+        LOG.error("ERROR: " + response.getErrorMessage());
       }
-    }.withRetries();
+      return false;
+    }
   }
 
   @Override
   public boolean updateQueryWithJson(final String json) throws ServiceException {
 
-    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQuery(json);
-        builder.setIsJson(true);
-        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-        if (response.getResultCode() == ClientProtos.ResultCode.OK) {
-          return true;
-        } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
-          }
-          return false;
-        }
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+    QueryRequest.Builder builder = QueryRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQuery(json);
+    builder.setIsJson(true);
+    ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+      return true;
+    } else {
+      if (response.hasErrorMessage()) {
+        LOG.error("ERROR: " + response.getErrorMessage());
       }
-    }.withRetries();
+      return false;
+    }
   }
 
   @Override
   public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
-        return res.getQueryListList();
-
-      }
-    }.withRetries();
+    ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
+    return res.getQueryListList();
   }
 
   @Override
   public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
-        return res.getQueryListList();
-
-      }
-    }.withRetries();
+    ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
+    return res.getQueryListList();
   }
 
   @Override
   public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException {
 
-    return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 
-      public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
-
-        connection.checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
-        return res.getWorkerListList();
-      }
-
-    }.withRetries();
+    ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
+    return res.getWorkerListList();
   }
 
   @Override
@@ -540,7 +478,7 @@ public class QueryClientImpl implements QueryClient {
     NettyClientBase tmClient = null;
     try {
       /* send a kill to the TM */
-      tmClient = connection.getTajoMasterConnection(false);
+      tmClient = connection.getTajoMasterConnection();
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
 
       connection.checkSessionAndGet(tmClient);
@@ -581,25 +519,20 @@ public class QueryClientImpl implements QueryClient {
   }
   
   public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
-    return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false) {
-      public QueryInfoProto call(NettyClientBase client) throws ServiceException {
-        connection.checkSessionAndGet(client);
-
-        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQueryId(queryId.getProto());
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
-          return res.getQueryInfo();
-        } else {
-          abort();
-          throw new ServiceException(res.getErrorMessage());
-        }
-      }
-    }.withRetries();
+    NettyClientBase client = connection.getTajoMasterConnection();
+    connection.checkSessionAndGet(client);
+
+    QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+    builder.setSessionId(connection.sessionId);
+    builder.setQueryId(queryId.getProto());
+
+    TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+    GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
+    if (res.getResultCode() == ResultCode.OK) {
+      return res.getQueryInfo();
+    } else {
+      throw new ServiceException(res.getErrorMessage());
+    }
   }
 
   public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@ -611,24 +544,31 @@ public class QueryClientImpl implements QueryClient {
     InetSocketAddress qmAddress = new InetSocketAddress(
         queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
 
-    return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
-        QueryMasterClientProtocol.class, false) {
-      public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
-        connection.checkSessionAndGet(client);
+    RpcClientManager manager = RpcClientManager.getInstance();
+    NettyClientBase queryMasterClient;
+    try {
+      queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+          manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+    } catch (Exception e) {
+      throw new ServiceException(e);
+    }
 
-        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-        builder.setSessionId(connection.sessionId);
-        builder.setQueryId(queryId.getProto());
+    try {
+      connection.checkSessionAndGet(connection.getTajoMasterConnection());
 
-        QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
-        GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
-          return res.getQueryHistory();
-        } else {
-          abort();
-          throw new ServiceException(res.getErrorMessage());
-        }
+      QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+      builder.setSessionId(connection.sessionId);
+      builder.setQueryId(queryId.getProto());
+
+      QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+      GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
+      if (res.getResultCode() == ResultCode.OK) {
+        return res.getQueryHistory();
+      } else {
+        throw new ServiceException(res.getErrorMessage());
       }
-    }.withRetries();
+    } finally {
+      queryMasterClient.close();
+    }
   }
 }


[6/6] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/42bcf2de
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/42bcf2de
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/42bcf2de

Branch: refs/heads/index_support
Commit: 42bcf2de090bf1bb5b5ec711427654056a2866e2
Parents: 86c97b2 9b3824b
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 8 01:35:19 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 8 01:35:19 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  10 +
 .../tajo/catalog/AbstractCatalogClient.java     | 638 +++++++------------
 .../org/apache/tajo/catalog/CatalogClient.java  |  49 +-
 .../java/org/apache/tajo/catalog/Schema.java    |  12 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   8 +-
 .../tajo/catalog/LocalCatalogWrapper.java       |  20 +-
 .../tajo/client/CatalogAdminClientImpl.java     | 518 ++++++++-------
 .../org/apache/tajo/client/QueryClientImpl.java | 469 ++++++++------
 .../apache/tajo/client/SessionConnection.java   | 351 +++++-----
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 -
 .../org/apache/tajo/master/QueryInProgress.java |  31 +-
 .../querymaster/QueryMasterManagerService.java  | 135 ++--
 .../tajo/worker/ExecutionBlockContext.java      |  32 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   1 +
 .../tajo/worker/TajoWorkerManagerService.java   |   2 +
 .../main/java/org/apache/tajo/worker/Task.java  |   4 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  43 +-
 .../src/main/proto/QueryMasterProtocol.proto    |  14 +-
 .../cli/tsql/TestDefaultCliOutputFormatter.java |   4 -
 .../tajo/engine/planner/TestLogicalPlanner.java |  23 +-
 .../tajo/engine/query/TestInsertQuery.java      |  19 +
 .../apache/tajo/querymaster/TestKillQuery.java  |  63 +-
 .../TestInsertQuery/nation_diff_col_order.ddl   |   1 +
 .../testInsertWithDifferentColumnOrder.sql      |   1 +
 .../testInsertWithDifferentColumnOrder.result   |  27 +
 .../org/apache/tajo/plan/LogicalPlanner.java    |   9 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |  10 +-
 .../tajo/rpc/RetriesExhaustedException.java     | 104 ---
 .../org/apache/tajo/rpc/RpcClientManager.java   |   9 +
 .../org/apache/tajo/rpc/ServerCallable.java     | 148 -----
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  39 --
 .../org/apache/tajo/storage/rcfile/RCFile.java  |  14 +-
 .../tajo/storage/text/CSVLineDeserializer.java  |  18 +-
 .../org/apache/tajo/storage/TestStorages.java   |  59 ++
 34 files changed, 1365 insertions(+), 1522 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index b52b2f5,766f6c2..c872f8b
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -29,17 -29,12 +29,13 @@@ import org.apache.tajo.catalog.proto.Ca
  import org.apache.tajo.catalog.proto.CatalogProtos.*;
  import org.apache.tajo.common.TajoDataTypes.DataType;
  import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.RpcClientManager;
- import org.apache.tajo.rpc.ServerCallable;
+ import org.apache.tajo.exception.InvalidOperationException;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
- import org.apache.tajo.service.ServiceTracker;
- import org.apache.tajo.service.ServiceTrackerFactory;
  import org.apache.tajo.util.ProtoUtil;
 +import org.apache.tajo.util.TUtil;
  
- import java.net.InetSocketAddress;
+ import java.io.Closeable;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
@@@ -373,37 -269,14 +270,26 @@@ public abstract class AbstractCatalogCl
    }
  
    @Override
 +  public List<IndexDescProto> getAllIndexes() {
 +    try {
-       return new ServerCallable<List<IndexDescProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- 
-         @Override
-         public List<IndexDescProto> call(NettyClientBase client) throws Exception {
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
-           return response.getIndexList();
-         }
-       }.withRetries();
++      CatalogProtocolService.BlockingInterface stub = getStub();
++      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++      return response.getIndexList();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
    public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
      try {
-       return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
- 
-           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-           builder.setDatabaseName(databaseName);
-           builder.setTableName(tableName);
+       TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+       builder.setDatabaseName(databaseName);
+       builder.setTableName(tableName);
  
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null,  builder.build()));
-         }
-       }.withRetries();
+       CatalogProtocolService.BlockingInterface stub = getStub();
+       return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
      } catch (ServiceException e) {
        LOG.error(e.getMessage(), e);
        return null;
@@@ -637,42 -458,15 +471,42 @@@
    }
  
    @Override
 -  public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
 +  public boolean existIndexByColumns(final String databaseName, final String tableName, final Column [] columns) {
 +    return existIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
 +  }
 +
 +  @Override
 +  public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) {
      try {
-       return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public Boolean call(NettyClientBase client) throws ServiceException {
  
-           GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
-           builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-           for (String colunName : columnNames) {
-             builder.addColumnNames(colunName);
-           }
 -      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//<<<<<<< HEAD
++      GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+       builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
 -      builder.setColumnName(columnName);
++      for (String colunName : columnNames) {
++        builder.addColumnNames(colunName);
++      }
  
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return stub.existIndexByColumnNames(null, builder.build()).getValue();
-         }
-       }.withRetries();
+       CatalogProtocolService.BlockingInterface stub = getStub();
 -      return stub.existIndexByColumn(null, builder.build()).getValue();
++      return stub.existIndexByColumnNames(null, builder.build()).getValue();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return false;
 +    }
 +  }
 +
 +  @Override
 +  public boolean existIndexesByTable(final String databaseName, final String tableName) {
 +    try {
-       return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public Boolean call(NettyClientBase client) throws ServiceException {
- 
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
-         }
-       }.withRetries();
++      CatalogProtocolService.BlockingInterface stub = getStub();
++      return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
++//=======
++//      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++//      builder.setColumnName(columnName);
++//
++//      CatalogProtocolService.BlockingInterface stub = getStub();
++//      return stub.existIndexByColumn(null, builder.build()).getValue();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
      } catch (ServiceException e) {
        LOG.error(e.getMessage(), e);
        return false;
@@@ -699,62 -488,17 +528,61 @@@
      }
    }
  
 +  private static String[] extractColumnNames(Column[] columns) {
 +    String[] columnNames = new String [columns.length];
 +    for (int i = 0; i < columnNames.length; i++) {
 +      columnNames[i] = columns[i].getSimpleName();
 +    }
 +    return columnNames;
 +  }
 +
 +  @Override
 +  public final IndexDesc getIndexByColumns(final String databaseName,
 +                                               final String tableName,
 +                                               final Column [] columns) {
 +    return getIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
 +  }
 +
    @Override
 -  public final IndexDesc getIndexByColumn(final String databaseName,
 -                                          final String tableName,
 -                                          final String columnName) {
 +  public final IndexDesc getIndexByColumnNames(final String databaseName,
 +                                           final String tableName,
 +                                           final String [] columnNames) {
      try {
-       return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public IndexDesc call(NettyClientBase client) throws ServiceException {
- 
-           GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
-           builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-           for (String columnName : columnNames) {
-             builder.addColumnNames(columnName);
-           }
 -      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++//      builder.setColumnName(columnName);
++//
++//<<<<<<< HEAD
++      GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+       builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
 -      builder.setColumnName(columnName);
++      for (String columnName : columnNames) {
++        builder.addColumnNames(columnName);
++      }
  
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
-         }
-       }.withRetries();
+       CatalogProtocolService.BlockingInterface stub = getStub();
 -      return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++      return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
 +  public final Collection<IndexDesc> getAllIndexesByTable(final String databaseName,
 +                                                          final String tableName) {
 +    try {
-       return new ServerCallable<Collection<IndexDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         @Override
-         public Collection<IndexDesc> call(NettyClientBase client) throws Exception {
-           TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
-           List<IndexDesc> indexDescs = TUtil.newList();
-           for (IndexDescProto descProto : response.getIndexDescList()) {
-             indexDescs.add(new IndexDesc(descProto));
-           }
-           return indexDescs;
-         }
-       }.withRetries();
++      TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
++      CatalogProtocolService.BlockingInterface stub = getStub();
++      GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
++      List<IndexDesc> indexDescs = TUtil.newList();
++      for (IndexDescProto descProto : response.getIndexDescList()) {
++        indexDescs.add(new IndexDesc(descProto));
++      }
++      return indexDescs;
++//=======
++//      CatalogProtocolService.BlockingInterface stub = getStub();
++//      return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
      } catch (ServiceException e) {
        LOG.error(e.getMessage(), e);
        return null;
@@@ -781,6 -520,18 +604,21 @@@
        return false;
      }
    }
 -  
 -  @Override
 -  public List<IndexProto> getAllIndexes() {
 -    try {
 -      CatalogProtocolService.BlockingInterface stub = getStub();
 -      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
 -      return response.getIndexList();
 -    } catch (ServiceException e) {
 -      LOG.error(e.getMessage(), e);
 -      return new ArrayList<IndexProto>();
 -    }
 -  }
++//<<<<<<< HEAD
++//=======
++//
++//  @Override
++//  public List<IndexProto> getAllIndexes() {
++//    try {
++//      CatalogProtocolService.BlockingInterface stub = getStub();
++//      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++//      return response.getIndexList();
++//    } catch (ServiceException e) {
++//      LOG.error(e.getMessage(), e);
++//      return new ArrayList<IndexProto>();
++//    }
++//  }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
  
    @Override
    public final boolean createFunction(final FunctionDesc funcDesc) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 5fa1c67,9397fcf..5a04892
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@@ -20,16 -20,15 +20,15 @@@ package org.apache.tajo.client
  
  import com.google.protobuf.ServiceException;
  import org.apache.tajo.annotation.Nullable;
 -import org.apache.tajo.catalog.CatalogUtil;
 -import org.apache.tajo.catalog.Schema;
 -import org.apache.tajo.catalog.TableDesc;
 -import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.catalog.*;
  import org.apache.tajo.catalog.partition.PartitionMethodDesc;
  import org.apache.tajo.catalog.proto.CatalogProtos;
 +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
  import org.apache.tajo.ipc.ClientProtos;
 +import org.apache.tajo.ipc.ClientProtos.*;
 +import org.apache.tajo.ipc.TajoMasterClientProtocol;
  import org.apache.tajo.jdbc.SQLStates;
  import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.ServerCallable;
  
  import java.io.IOException;
  import java.net.URI;
@@@ -132,32 -97,25 +97,57 @@@ public class CatalogAdminClientImpl imp
                                         final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
        throws SQLException, ServiceException {
  
-     return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
- 
-         connection.checkSessionAndGet(client);
-         BlockingInterface tajoMasterService = client.getStub();
- 
-         ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setName(tableName);
-         builder.setSchema(schema.getProto());
-         builder.setMeta(meta.getProto());
-         builder.setPath(path.toString());
-         if (partitionMethodDesc != null) {
-           builder.setPartition(partitionMethodDesc.getProto());
-         }
-         ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
-         if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return CatalogUtil.newTableDesc(res.getTableDesc());
-         } else {
-           throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-         }
-       }
- 
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     BlockingInterface tajoMasterService = client.getStub();
+ 
+     ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setName(tableName);
+     builder.setSchema(schema.getProto());
+     builder.setMeta(meta.getProto());
+     builder.setPath(path.toString());
+     if (partitionMethodDesc != null) {
+       builder.setPartition(partitionMethodDesc.getProto());
+     }
+     ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
 -    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return CatalogUtil.newTableDesc(res.getTableDesc());
+     } else {
 -      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++      throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+     }
++
++//<<<<<<< HEAD
++//    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++//      }
++//
++//    }.withRetries();
++//=======
++//    NettyClientBase client = connection.getTajoMasterConnection();
++//    connection.checkSessionAndGet(client);
++//    BlockingInterface tajoMasterService = client.getStub();
++//
++//    ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
++//    builder.setSessionId(connection.sessionId);
++//    builder.setName(tableName);
++//    builder.setSchema(schema.getProto());
++//    builder.setMeta(meta.getProto());
++//    builder.setPath(path.toString());
++//    if (partitionMethodDesc != null) {
++//      builder.setPartition(partitionMethodDesc.getProto());
++//    }
++//    ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
++//    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++//      return CatalogUtil.newTableDesc(res.getTableDesc());
++//    } else {
++//      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++//    }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    @Override
@@@ -211,169 -156,37 +188,222 @@@
  
    @Override
    public TableDesc getTableDesc(final String tableName) throws ServiceException {
- 
-     return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
- 
-         connection.checkSessionAndGet(client);
-         BlockingInterface tajoMasterService = client.getStub();
- 
-         ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setTableName(tableName);
-         ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
-         if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return CatalogUtil.newTableDesc(res.getTableDesc());
-         } else {
-           throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-         }
-       }
--
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     BlockingInterface tajoMasterService = client.getStub();
+ 
+     ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setTableName(tableName);
+     ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
 -    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return CatalogUtil.newTableDesc(res.getTableDesc());
+     } else {
 -      throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++      throw new ServiceException(new SQLException(res.getResult().getErrorMessage(),
++          SQLStates.ER_NO_SUCH_TABLE.getState()));
+     }
++
++//<<<<<<< HEAD
++//    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++//      }
++//
++//    }.withRetries();
++//=======
++//    NettyClientBase client = connection.getTajoMasterConnection();
++//    connection.checkSessionAndGet(client);
++//    BlockingInterface tajoMasterService = client.getStub();
++//
++//    ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
++//    builder.setSessionId(connection.sessionId);
++//    builder.setTableName(tableName);
++//    ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
++//    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++//      return CatalogUtil.newTableDesc(res.getTableDesc());
++//    } else {
++//      throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++//    }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    @Override
    public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
- 
-     return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
- 
-         connection.checkSessionAndGet(client);
-         BlockingInterface tajoMasterService = client.getStub();
- 
-         String paramFunctionName = functionName == null ? "" : functionName;
-         ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
-             connection.convertSessionedString(paramFunctionName));
-         if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return res.getFunctionsList();
-         } else {
-           throw new SQLException(res.getResult().getErrorMessage());
-         }
-       }
--
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     BlockingInterface tajoMasterService = client.getStub();
+ 
+     String paramFunctionName = functionName == null ? "" : functionName;
+     ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+         connection.convertSessionedString(paramFunctionName));
 -    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return res.getFunctionsList();
+     } else {
 -      throw new ServiceException(new SQLException(res.getErrorMessage()));
++      throw new ServiceException(res.getResult().getErrorMessage());
++    }
++
++//<<<<<<< HEAD
++//    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++//      }
++//
++//    }.withRetries();
++//=======
++//    NettyClientBase client = connection.getTajoMasterConnection();
++//    connection.checkSessionAndGet(client);
++//    BlockingInterface tajoMasterService = client.getStub();
++//
++//    String paramFunctionName = functionName == null ? "" : functionName;
++//    ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
++//        connection.convertSessionedString(paramFunctionName));
++//    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++//      return res.getFunctionsList();
++//    } else {
++//      throw new ServiceException(new SQLException(res.getErrorMessage()));
++//    }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
 +  }
 +
 +  @Override
 +  public IndexDescProto getIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public IndexDescProto call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.getIndexWithName(null,
-             connection.convertSessionedString(indexName));
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.getIndexWithName(null,
++        connection.convertSessionedString(indexName));
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.existIndexWithName(null,
-             connection.convertSessionedString(indexName)).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.existIndexWithName(null,
++        connection.convertSessionedString(indexName)).getValue();
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException {
-     return new ServerCallable<List<IndexDescProto>>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public List<IndexDescProto> call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
-             connection.convertSessionedString(tableName));
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           return response.getIndexesList();
-         } else {
-           throw new SQLException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
++        connection.convertSessionedString(tableName));
++    if (response.getResult().getResultCode() == ResultCode.OK) {
++      return response.getIndexesList();
++    } else {
++      throw new ServiceException(response.getResult().getErrorMessage());
++    }
++//    return new ServerCallable<List<IndexDescProto>>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public List<IndexDescProto> call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean hasIndexes(final String tableName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.existIndexesForTable(null,
-             connection.convertSessionedString(tableName)).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.existIndexesForTable(null,
++        connection.convertSessionedString(tableName)).getValue();
++
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public IndexDescProto call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setTableName(tableName);
-         for (String eachColumnName : columnNames) {
-           builder.addColumnNames(eachColumnName);
-         }
-         GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           return response.getIndexDesc();
-         } else {
-           throw new SQLException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++    builder.setSessionId(connection.sessionId);
++    builder.setTableName(tableName);
++    for (String eachColumnName : columnNames) {
++      builder.addColumnNames(eachColumnName);
++    }
++    GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
++    if (response.getResult().getResultCode() == ResultCode.OK) {
++      return response.getIndexDesc();
++    } else {
++      throw new ServiceException(response.getResult().getErrorMessage());
+     }
++
++//    return new ServerCallable<IndexDescProto>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public IndexDescProto call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setTableName(tableName);
-         for (String eachColumnName : columnName) {
-           builder.addColumnNames(eachColumnName);
-         }
-         return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++    builder.setSessionId(connection.sessionId);
++    builder.setTableName(tableName);
++    for (String eachColumnName : columnName) {
++      builder.addColumnNames(eachColumnName);
++    }
++    return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
++
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean dropIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.dropIndex(null,
-             connection.convertSessionedString(indexName)).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.dropIndex(null,
++        connection.convertSessionedString(indexName)).getValue();
++
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 73abc4c,53889fe..007c010
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@@ -153,28 -153,21 +153,40 @@@ public class QueryClientImpl implement
  
    @Override
    public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
- 
-     return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
- 
-         connection.checkSessionAndGet(client);
- 
-         final QueryRequest.Builder builder = QueryRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQuery(sql);
-         builder.setIsJson(false);
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
- 
-         SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-         }
-         return response;
-       }
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+ 
+     final QueryRequest.Builder builder = QueryRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQuery(sql);
+     builder.setIsJson(false);
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 -
 -
+     SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
 -    if (response.getResultCode() == ResultCode.OK) {
++    if (response.getResult().getResultCode() == ResultCode.OK) {
+       connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+     }
+     return response;
++
++//<<<<<<< HEAD
++//        connection.checkSessionAndGet(client);
++//
++//        final QueryRequest.Builder builder = QueryRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQuery(sql);
++//        builder.setIsJson(false);
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//
++//
++//      }
++//    }.withRetries();
++//=======
++//    SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
++//    if (response.getResultCode() == ResultCode.OK) {
++//      connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//    }
++//    return response;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    @Override
@@@ -369,42 -356,26 +375,60 @@@
        throws ServiceException {
  
      try {
-       final ServerCallable<ClientProtos.SerializedResultSet> callable =
-           new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
-               TajoMasterClientProtocol.class, false) {
- 
-             public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
- 
-               connection.checkSessionAndGet(client);
-               TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
-               GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
-               builder.setSessionId(connection.sessionId);
-               builder.setQueryId(queryId.getProto());
-               builder.setFetchRowNum(fetchRowNum);
-               try {
-                 GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
-                 if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
-                   abort();
-                   throw new ServiceException(response.getResult().getErrorMessage());
-                 }
- 
-                 return response.getResultSet();
-               } catch (ServiceException e) {
-                 abort();
-                 throw e;
-               } catch (Throwable t) {
-                 throw new ServiceException(t.getMessage(), t);
-               }
-             }
-           };
- 
-       ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//<<<<<<< HEAD
++//      final ServerCallable<ClientProtos.SerializedResultSet> callable =
++//          new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
++//              TajoMasterClientProtocol.class, false) {
++//
++//            public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
++//
++//              connection.checkSessionAndGet(client);
++//              TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//              GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
++//              builder.setSessionId(connection.sessionId);
++//              builder.setQueryId(queryId.getProto());
++//              builder.setFetchRowNum(fetchRowNum);
++//              try {
++//                GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
++//                if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++//                  abort();
++//                  throw new ServiceException(response.getResult().getErrorMessage());
++//                }
++//
++//                return response.getResultSet();
++//              } catch (ServiceException e) {
++//                abort();
++//                throw e;
++//              } catch (Throwable t) {
++//                throw new ServiceException(t.getMessage(), t);
++//              }
++//            }
++//          };
++//
++//      ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//=======
+       NettyClientBase client = connection.getTajoMasterConnection();
+       connection.checkSessionAndGet(client);
+       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ 
+       GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+       builder.setSessionId(connection.sessionId);
+       builder.setQueryId(queryId.getProto());
+       builder.setFetchRowNum(fetchRowNum);
+ 
+       GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
 -      if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
 -        throw new ServiceException(response.getErrorMessage());
++      if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++        throw new ServiceException(response.getResult().getErrorMessage());
+       }
+ 
+       ClientProtos.SerializedResultSet resultSet = response.getResultSet();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
  
        return new TajoMemoryResultSet(queryId,
-           new Schema(serializedResultSet.getSchema()),
-           serializedResultSet.getSerializedTuplesList(),
-           serializedResultSet.getSerializedTuplesCount(),
+           new Schema(resultSet.getSchema()),
+           resultSet.getSerializedTuplesList(),
+           resultSet.getSerializedTuplesCount(),
            getClientSideSessionVars());
      } catch (ServiceException e) {
        throw e;
@@@ -416,59 -387,47 +440,92 @@@
    @Override
    public boolean updateQuery(final String sql) throws ServiceException {
  
-     return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public Boolean call(NettyClientBase client) throws ServiceException {
- 
-         connection.checkSessionAndGet(client);
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
-         QueryRequest.Builder builder = QueryRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQuery(sql);
-         builder.setIsJson(false);
-         ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- 
-         if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-           return true;
-         } else {
-           if (response.getResult().hasErrorMessage()) {
-             System.err.println("ERROR: " + response.getResult().getErrorMessage());
-           }
-           return false;
-         }
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ 
+     QueryRequest.Builder builder = QueryRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQuery(sql);
+     builder.setIsJson(false);
+     ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+ 
 -    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++//<<<<<<< HEAD
++//        connection.checkSessionAndGet(client);
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//        QueryRequest.Builder builder = QueryRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQuery(sql);
++//        builder.setIsJson(false);
++//        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++//
++//        if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++//          connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//          return true;
++//        } else {
++//          if (response.getResult().hasErrorMessage()) {
++//            System.err.println("ERROR: " + response.getResult().getErrorMessage());
++//          }
++//          return false;
++//        }
++//=======
++    if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+       return true;
+     } else {
 -      if (response.hasErrorMessage()) {
 -        LOG.error("ERROR: " + response.getErrorMessage());
++      if (response.getResult().hasErrorMessage()) {
++        LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+       return false;
+     }
    }
  
    @Override
    public boolean updateQueryWithJson(final String json) throws ServiceException {
  
-     return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public Boolean call(NettyClientBase client) throws ServiceException {
- 
-         connection.checkSessionAndGet(client);
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
-         QueryRequest.Builder builder = QueryRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQuery(json);
-         builder.setIsJson(true);
-         ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-         if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return true;
-         } else {
-           if (response.getResult().hasErrorMessage()) {
-             System.err.println("ERROR: " + response.getResult().getErrorMessage());
-           }
-           return false;
-         }
++//<<<<<<< HEAD
++//    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public Boolean call(NettyClientBase client) throws ServiceException {
++//
++//        connection.checkSessionAndGet(client);
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//        QueryRequest.Builder builder = QueryRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQuery(json);
++//        builder.setIsJson(true);
++//        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++//        if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++//          return true;
++//        } else {
++//          if (response.getResult().hasErrorMessage()) {
++//            System.err.println("ERROR: " + response.getResult().getErrorMessage());
++//          }
++//          return false;
++//        }
++//=======
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ 
+     QueryRequest.Builder builder = QueryRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQuery(json);
+     builder.setIsJson(true);
+     ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
 -    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return true;
+     } else {
 -      if (response.hasErrorMessage()) {
 -        LOG.error("ERROR: " + response.getErrorMessage());
++      if (response.getResult().hasErrorMessage()) {
++        LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+       return false;
+     }
    }
  
    @Override
@@@ -581,25 -519,20 +617,42 @@@
    }
    
    public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
-     return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
-       public QueryInfoProto call(NettyClientBase client) throws ServiceException {
-         connection.checkSessionAndGet(client);
- 
-         QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQueryId(queryId.getProto());
- 
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
-         if (res.getResult().getResultCode() == ResultCode.OK) {
-           return res.getQueryInfo();
-         } else {
-           abort();
-           throw new ServiceException(res.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++//<<<<<<< HEAD
++//    return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//      public QueryInfoProto call(NettyClientBase client) throws ServiceException {
++//        connection.checkSessionAndGet(client);
++//
++//        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQueryId(queryId.getProto());
++//
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//        GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
++//        if (res.getResult().getResultCode() == ResultCode.OK) {
++//          return res.getQueryInfo();
++//        } else {
++//          abort();
++//          throw new ServiceException(res.getResult().getErrorMessage());
++//        }
++//      }
++//    }.withRetries();
++//=======
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+ 
+     QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQueryId(queryId.getProto());
+ 
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
 -    if (res.getResultCode() == ResultCode.OK) {
++    if (res.getResult().getResultCode() == ResultCode.OK) {
+       return res.getQueryInfo();
+     } else {
 -      throw new ServiceException(res.getErrorMessage());
++      throw new ServiceException(res.getResult().getErrorMessage());
+     }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@@ -611,24 -544,31 +664,42 @@@
      InetSocketAddress qmAddress = new InetSocketAddress(
          queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
  
-     return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
-         QueryMasterClientProtocol.class, false) {
-       public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
-         connection.checkSessionAndGet(client);
+     RpcClientManager manager = RpcClientManager.getInstance();
+     NettyClientBase queryMasterClient;
+     try {
+       queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+           manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+     } catch (Exception e) {
+       throw new ServiceException(e);
+     }
  
-         QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQueryId(queryId.getProto());
+     try {
+       connection.checkSessionAndGet(connection.getTajoMasterConnection());
+ 
++//<<<<<<< HEAD
++//        QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
++//        GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
++//        if (res.getResult().getResultCode() == ResultCode.OK) {
++//          return res.getQueryHistory();
++//        } else {
++//          abort();
++//          throw new ServiceException(res.getResult().getErrorMessage());
++//        }
++//=======
+       QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+       builder.setSessionId(connection.sessionId);
+       builder.setQueryId(queryId.getProto());
  
-         QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
-         GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
-         if (res.getResult().getResultCode() == ResultCode.OK) {
-           return res.getQueryHistory();
-         } else {
-           abort();
-           throw new ServiceException(res.getResult().getErrorMessage());
-         }
+       QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+       GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
 -      if (res.getResultCode() == ResultCode.OK) {
++      if (res.getResult().getResultCode() == ResultCode.OK) {
+         return res.getQueryHistory();
+       } else {
 -        throw new ServiceException(res.getErrorMessage());
++        throw new ServiceException(res.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+     } finally {
+       queryMasterClient.close();
+     }
    }
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index bb15981,84decd5..1bb0e16
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@@ -157,52 -160,43 +160,81 @@@ public class SessionConnection implemen
    }
  
    public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException {
-     return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public Map<String, String> call(NettyClientBase client) throws ServiceException {
-         checkSessionAndGet(client);
- 
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         KeyValueSet keyValueSet = new KeyValueSet();
-         keyValueSet.putAll(variables);
-         ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-             .setSessionId(sessionId)
-             .setSessionVars(keyValueSet.getProto()).build();
- 
-         SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
- 
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-           return Collections.unmodifiableMap(sessionVarsCache);
-         } else {
-           throw new ServiceException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++//<<<<<<< HEAD
++//    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public Map<String, String> call(NettyClientBase client) throws ServiceException {
++//        checkSessionAndGet(client);
++//
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//        KeyValueSet keyValueSet = new KeyValueSet();
++//        keyValueSet.putAll(variables);
++//        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++//            .setSessionId(sessionId)
++//            .setSessionVars(keyValueSet.getProto()).build();
++//
++//        SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
++//
++//        if (response.getResult().getResultCode() == ResultCode.OK) {
++//          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//          return Collections.unmodifiableMap(sessionVarsCache);
++//        } else {
++//          throw new ServiceException(response.getResult().getErrorMessage());
++//        }
++//      }
++//    }.withRetries();
++//  }
++//=======
+     NettyClientBase client = getTajoMasterConnection();
+     checkSessionAndGet(client);
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
+ 
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     KeyValueSet keyValueSet = new KeyValueSet();
+     keyValueSet.putAll(variables);
+     ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+         .setSessionId(sessionId)
+         .setSessionVars(keyValueSet.getProto()).build();
+ 
+     SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+ 
 -    if (response.getResultCode() == ResultCode.OK) {
++    if (response.getResult().getResultCode() == ResultCode.OK) {
+       updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+       return Collections.unmodifiableMap(sessionVarsCache);
+     } else {
 -      throw new ServiceException(response.getMessage());
++      throw new ServiceException(response.getResult().getErrorMessage());
+     }
    }
  
-   public Map<String, String> unsetSessionVariables(final List<String> variables)  throws ServiceException {
-     return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       public Map<String, String> call(NettyClientBase client) throws ServiceException {
-         checkSessionAndGet(client);
- 
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-             .setSessionId(sessionId)
-             .addAllUnsetVariables(variables).build();
- 
-         SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
- 
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-           return Collections.unmodifiableMap(sessionVarsCache);
-         } else {
-           throw new ServiceException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
+   public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
+     NettyClientBase client = getTajoMasterConnection();
+     checkSessionAndGet(client);
+ 
++//<<<<<<< HEAD
++//        if (response.getResult().getResultCode() == ResultCode.OK) {
++//          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//          return Collections.unmodifiableMap(sessionVarsCache);
++//        } else {
++//          throw new ServiceException(response.getResult().getErrorMessage());
++//        }
++//      }
++//    }.withRetries();
++//=======
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+         .setSessionId(sessionId)
+         .addAllUnsetVariables(variables).build();
+ 
+     SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+ 
 -    if (response.getResultCode() == ResultCode.OK) {
++    if (response.getResult().getResultCode() == ResultCode.OK) {
+       updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+       return Collections.unmodifiableMap(sessionVarsCache);
+     } else {
 -      throw new ServiceException(response.getMessage());
++      throw new ServiceException(response.getResult().getErrorMessage());
+     }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    void updateSessionVarsCache(Map<String, String> variables) {
@@@ -333,55 -308,51 +346,81 @@@
    }
  
    public boolean reconnect() throws Exception {
-     return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       public Boolean call(NettyClientBase client) throws ServiceException {
-         CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
-         builder.setUsername(userInfo.getUserName()).build();
-         if (baseDatabase != null) {
-           builder.setBaseDatabaseName(baseDatabase);
-         }
- 
+     CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+     builder.setUsername(userInfo.getUserName()).build();
+     if (baseDatabase != null) {
+       builder.setBaseDatabaseName(baseDatabase);
+     }
  
-         // create new session
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
-         if (response.getResult().getResultCode() != ResultCode.OK) {
-           return false;
-         }
+     NettyClientBase client = getTajoMasterConnection();
+ 
++//<<<<<<< HEAD
++//        // create new session
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//        CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
++//        if (response.getResult().getResultCode() != ResultCode.OK) {
++//          return false;
++//        }
++//=======
+     // create new session
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
 -    if (response.getResultCode() != ResultCode.OK) {
++    if (response.getResult().getResultCode() != ResultCode.OK) {
+       return false;
+     }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
  
-         // Invalidate some session variables in client cache
-         sessionId = response.getSessionId();
-         Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
-         synchronized (sessionVarsCache) {
-           for (SessionVars var : UPDATE_ON_RECONNECT) {
-             String value = sessionVars.get(var.keyname());
-             if (value != null) {
-               sessionVarsCache.put(var.keyname(), value);
-             }
-           }
+     // Invalidate some session variables in client cache
+     sessionId = response.getSessionId();
+     Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+     synchronized (sessionVarsCache) {
+       for (SessionVars var : UPDATE_ON_RECONNECT) {
+         String value = sessionVars.get(var.keyname());
+         if (value != null) {
+           sessionVarsCache.put(var.keyname(), value);
          }
+       }
+     }
  
-         // Update the session variables in server side
-         try {
-           KeyValueSet keyValueSet = new KeyValueSet();
-           keyValueSet.putAll(sessionVarsCache);
-           ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-               .setSessionId(sessionId)
-               .setSessionVars(keyValueSet.getProto()).build();
- 
-           if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
-             tajoMasterService.removeSession(null, sessionId);
-             return false;
-           }
-           LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
-           return true;
-         } catch (ServiceException e) {
-           tajoMasterService.removeSession(null, sessionId);
-           return false;
-         }
++//<<<<<<< HEAD
++//        // Update the session variables in server side
++//        try {
++//          KeyValueSet keyValueSet = new KeyValueSet();
++//          keyValueSet.putAll(sessionVarsCache);
++//          ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++//              .setSessionId(sessionId)
++//              .setSessionVars(keyValueSet.getProto()).build();
++//
++//          if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
++//            tajoMasterService.removeSession(null, sessionId);
++//            return false;
++//          }
++//          LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
++//          return true;
++//        } catch (ServiceException e) {
++//          tajoMasterService.removeSession(null, sessionId);
++//          return false;
++//        }
++//=======
+     // Update the session variables in server side
+     try {
+       KeyValueSet keyValueSet = new KeyValueSet();
+       keyValueSet.putAll(sessionVarsCache);
+       ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+           .setSessionId(sessionId)
+           .setSessionVars(keyValueSet.getProto()).build();
+ 
 -      if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
++      if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
+         tajoMasterService.removeSession(null, sessionId);
+         return false;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+       LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+       return true;
+     } catch (ServiceException e) {
+       tajoMasterService.removeSession(null, sessionId);
+       return false;
+     }
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index be1cdf2,b1a27fa..7ad658f
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -157,24 -153,55 +153,55 @@@ public class TestKillQuery 
    @Test
    public final void testIgnoreStageStateFromKilled() throws Exception {
  
-     ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
-     QueryId queryId = new QueryId(res.getQueryId());
-     cluster.waitForQuerySubmitted(queryId);
+     SQLAnalyzer analyzer = new SQLAnalyzer();
+     QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+     Session session = LocalTajoTestingUtility.createDummySession();
+     CatalogService catalog = cluster.getMaster().getCatalog();
+ 
+     LogicalPlanner planner = new LogicalPlanner(catalog);
 -    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
++    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+     Expr expr =  analyzer.parse(queryStr);
+     LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ 
+     optimizer.optimize(plan);
+ 
+     QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+     QueryContext queryContext = new QueryContext(conf);
+     MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+     GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+     globalPlanner.build(masterPlan);
  
-     QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
-     Query query = qmt.getQuery();
+     CountDownLatch barrier  = new CountDownLatch(1);
+     MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+ 
+     QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+         queryId, session, defaultContext, expr.toJson(), dispatch);
  
-     // wait for a stage created
-     cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
-     query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+     queryMasterTask.init(conf);
+     queryMasterTask.getQueryTaskContext().getDispatcher().start();
+     queryMasterTask.startQuery();
  
      try{
-       cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
-     } finally {
-       assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState());
+       barrier.await(5000, TimeUnit.MILLISECONDS);
+     } catch (InterruptedException e) {
+       fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+     }
+ 
+     Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+     assertNotNull(stage);
+ 
+     // fire kill event
+     queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+ 
+     try {
+       cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+       assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+     }   finally {
+       queryMasterTask.stop();
      }
  
-     List<Stage> stages = Lists.newArrayList(query.getStages());
+     List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
      Stage lastStage = stages.get(stages.size() - 1);
  
      assertEquals(StageState.KILLED, lastStage.getSynchronizedState());

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------