You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/07 18:36:05 UTC

[6/6] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/42bcf2de
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/42bcf2de
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/42bcf2de

Branch: refs/heads/index_support
Commit: 42bcf2de090bf1bb5b5ec711427654056a2866e2
Parents: 86c97b2 9b3824b
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 8 01:35:19 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 8 01:35:19 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  10 +
 .../tajo/catalog/AbstractCatalogClient.java     | 638 +++++++------------
 .../org/apache/tajo/catalog/CatalogClient.java  |  49 +-
 .../java/org/apache/tajo/catalog/Schema.java    |  12 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   8 +-
 .../tajo/catalog/LocalCatalogWrapper.java       |  20 +-
 .../tajo/client/CatalogAdminClientImpl.java     | 518 ++++++++-------
 .../org/apache/tajo/client/QueryClientImpl.java | 469 ++++++++------
 .../apache/tajo/client/SessionConnection.java   | 351 +++++-----
 .../java/org/apache/tajo/conf/TajoConf.java     |   2 -
 .../org/apache/tajo/master/QueryInProgress.java |  31 +-
 .../querymaster/QueryMasterManagerService.java  | 135 ++--
 .../tajo/worker/ExecutionBlockContext.java      |  32 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   1 +
 .../tajo/worker/TajoWorkerManagerService.java   |   2 +
 .../main/java/org/apache/tajo/worker/Task.java  |   4 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  43 +-
 .../src/main/proto/QueryMasterProtocol.proto    |  14 +-
 .../cli/tsql/TestDefaultCliOutputFormatter.java |   4 -
 .../tajo/engine/planner/TestLogicalPlanner.java |  23 +-
 .../tajo/engine/query/TestInsertQuery.java      |  19 +
 .../apache/tajo/querymaster/TestKillQuery.java  |  63 +-
 .../TestInsertQuery/nation_diff_col_order.ddl   |   1 +
 .../testInsertWithDifferentColumnOrder.sql      |   1 +
 .../testInsertWithDifferentColumnOrder.result   |  27 +
 .../org/apache/tajo/plan/LogicalPlanner.java    |   9 +-
 .../org/apache/tajo/rpc/NettyClientBase.java    |  10 +-
 .../tajo/rpc/RetriesExhaustedException.java     | 104 ---
 .../org/apache/tajo/rpc/RpcClientManager.java   |   9 +
 .../org/apache/tajo/rpc/ServerCallable.java     | 148 -----
 .../org/apache/tajo/rpc/TestBlockingRpc.java    |  39 --
 .../org/apache/tajo/storage/rcfile/RCFile.java  |  14 +-
 .../tajo/storage/text/CSVLineDeserializer.java  |  18 +-
 .../org/apache/tajo/storage/TestStorages.java   |  59 ++
 34 files changed, 1365 insertions(+), 1522 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index b52b2f5,766f6c2..c872f8b
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -29,17 -29,12 +29,13 @@@ import org.apache.tajo.catalog.proto.Ca
  import org.apache.tajo.catalog.proto.CatalogProtos.*;
  import org.apache.tajo.common.TajoDataTypes.DataType;
  import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.RpcClientManager;
- import org.apache.tajo.rpc.ServerCallable;
+ import org.apache.tajo.exception.InvalidOperationException;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
- import org.apache.tajo.service.ServiceTracker;
- import org.apache.tajo.service.ServiceTrackerFactory;
  import org.apache.tajo.util.ProtoUtil;
 +import org.apache.tajo.util.TUtil;
  
- import java.net.InetSocketAddress;
+ import java.io.Closeable;
  import java.util.ArrayList;
  import java.util.Collection;
  import java.util.List;
@@@ -373,37 -269,14 +270,26 @@@ public abstract class AbstractCatalogCl
    }
  
    @Override
 +  public List<IndexDescProto> getAllIndexes() {
 +    try {
-       return new ServerCallable<List<IndexDescProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- 
-         @Override
-         public List<IndexDescProto> call(NettyClientBase client) throws Exception {
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
-           return response.getIndexList();
-         }
-       }.withRetries();
++      CatalogProtocolService.BlockingInterface stub = getStub();
++      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++      return response.getIndexList();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
    public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
      try {
-       return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
- 
-           TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
-           builder.setDatabaseName(databaseName);
-           builder.setTableName(tableName);
+       TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+       builder.setDatabaseName(databaseName);
+       builder.setTableName(tableName);
  
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null,  builder.build()));
-         }
-       }.withRetries();
+       CatalogProtocolService.BlockingInterface stub = getStub();
+       return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
      } catch (ServiceException e) {
        LOG.error(e.getMessage(), e);
        return null;
@@@ -637,42 -458,15 +471,42 @@@
    }
  
    @Override
 -  public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
 +  public boolean existIndexByColumns(final String databaseName, final String tableName, final Column [] columns) {
 +    return existIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
 +  }
 +
 +  @Override
 +  public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) {
      try {
-       return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public Boolean call(NettyClientBase client) throws ServiceException {
  
-           GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
-           builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-           for (String colunName : columnNames) {
-             builder.addColumnNames(colunName);
-           }
 -      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//<<<<<<< HEAD
++      GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+       builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
 -      builder.setColumnName(columnName);
++      for (String colunName : columnNames) {
++        builder.addColumnNames(colunName);
++      }
  
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return stub.existIndexByColumnNames(null, builder.build()).getValue();
-         }
-       }.withRetries();
+       CatalogProtocolService.BlockingInterface stub = getStub();
 -      return stub.existIndexByColumn(null, builder.build()).getValue();
++      return stub.existIndexByColumnNames(null, builder.build()).getValue();
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return false;
 +    }
 +  }
 +
 +  @Override
 +  public boolean existIndexesByTable(final String databaseName, final String tableName) {
 +    try {
-       return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public Boolean call(NettyClientBase client) throws ServiceException {
- 
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
-         }
-       }.withRetries();
++      CatalogProtocolService.BlockingInterface stub = getStub();
++      return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
++//=======
++//      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++//      builder.setColumnName(columnName);
++//
++//      CatalogProtocolService.BlockingInterface stub = getStub();
++//      return stub.existIndexByColumn(null, builder.build()).getValue();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
      } catch (ServiceException e) {
        LOG.error(e.getMessage(), e);
        return false;
@@@ -699,62 -488,17 +528,61 @@@
      }
    }
  
 +  private static String[] extractColumnNames(Column[] columns) {
 +    String[] columnNames = new String [columns.length];
 +    for (int i = 0; i < columnNames.length; i++) {
 +      columnNames[i] = columns[i].getSimpleName();
 +    }
 +    return columnNames;
 +  }
 +
 +  @Override
 +  public final IndexDesc getIndexByColumns(final String databaseName,
 +                                               final String tableName,
 +                                               final Column [] columns) {
 +    return getIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
 +  }
 +
    @Override
 -  public final IndexDesc getIndexByColumn(final String databaseName,
 -                                          final String tableName,
 -                                          final String columnName) {
 +  public final IndexDesc getIndexByColumnNames(final String databaseName,
 +                                           final String tableName,
 +                                           final String [] columnNames) {
      try {
-       return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         public IndexDesc call(NettyClientBase client) throws ServiceException {
- 
-           GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
-           builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
-           for (String columnName : columnNames) {
-             builder.addColumnNames(columnName);
-           }
 -      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//      GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//      builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++//      builder.setColumnName(columnName);
++//
++//<<<<<<< HEAD
++      GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+       builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
 -      builder.setColumnName(columnName);
++      for (String columnName : columnNames) {
++        builder.addColumnNames(columnName);
++      }
  
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
-         }
-       }.withRetries();
+       CatalogProtocolService.BlockingInterface stub = getStub();
 -      return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++      return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
 +    } catch (ServiceException e) {
 +      LOG.error(e.getMessage(), e);
 +      return null;
 +    }
 +  }
 +
 +  @Override
 +  public final Collection<IndexDesc> getAllIndexesByTable(final String databaseName,
 +                                                          final String tableName) {
 +    try {
-       return new ServerCallable<Collection<IndexDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-         @Override
-         public Collection<IndexDesc> call(NettyClientBase client) throws Exception {
-           TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
-           CatalogProtocolService.BlockingInterface stub = getStub(client);
-           GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
-           List<IndexDesc> indexDescs = TUtil.newList();
-           for (IndexDescProto descProto : response.getIndexDescList()) {
-             indexDescs.add(new IndexDesc(descProto));
-           }
-           return indexDescs;
-         }
-       }.withRetries();
++      TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
++      CatalogProtocolService.BlockingInterface stub = getStub();
++      GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
++      List<IndexDesc> indexDescs = TUtil.newList();
++      for (IndexDescProto descProto : response.getIndexDescList()) {
++        indexDescs.add(new IndexDesc(descProto));
++      }
++      return indexDescs;
++//=======
++//      CatalogProtocolService.BlockingInterface stub = getStub();
++//      return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
      } catch (ServiceException e) {
        LOG.error(e.getMessage(), e);
        return null;
@@@ -781,6 -520,18 +604,21 @@@
        return false;
      }
    }
 -  
 -  @Override
 -  public List<IndexProto> getAllIndexes() {
 -    try {
 -      CatalogProtocolService.BlockingInterface stub = getStub();
 -      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
 -      return response.getIndexList();
 -    } catch (ServiceException e) {
 -      LOG.error(e.getMessage(), e);
 -      return new ArrayList<IndexProto>();
 -    }
 -  }
++//<<<<<<< HEAD
++//=======
++//
++//  @Override
++//  public List<IndexProto> getAllIndexes() {
++//    try {
++//      CatalogProtocolService.BlockingInterface stub = getStub();
++//      GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++//      return response.getIndexList();
++//    } catch (ServiceException e) {
++//      LOG.error(e.getMessage(), e);
++//      return new ArrayList<IndexProto>();
++//    }
++//  }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
  
    @Override
    public final boolean createFunction(final FunctionDesc funcDesc) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 5fa1c67,9397fcf..5a04892
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@@ -20,16 -20,15 +20,15 @@@ package org.apache.tajo.client
  
  import com.google.protobuf.ServiceException;
  import org.apache.tajo.annotation.Nullable;
 -import org.apache.tajo.catalog.CatalogUtil;
 -import org.apache.tajo.catalog.Schema;
 -import org.apache.tajo.catalog.TableDesc;
 -import org.apache.tajo.catalog.TableMeta;
 +import org.apache.tajo.catalog.*;
  import org.apache.tajo.catalog.partition.PartitionMethodDesc;
  import org.apache.tajo.catalog.proto.CatalogProtos;
 +import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
  import org.apache.tajo.ipc.ClientProtos;
 +import org.apache.tajo.ipc.ClientProtos.*;
 +import org.apache.tajo.ipc.TajoMasterClientProtocol;
  import org.apache.tajo.jdbc.SQLStates;
  import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.ServerCallable;
  
  import java.io.IOException;
  import java.net.URI;
@@@ -132,32 -97,25 +97,57 @@@ public class CatalogAdminClientImpl imp
                                         final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
        throws SQLException, ServiceException {
  
-     return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
- 
-         connection.checkSessionAndGet(client);
-         BlockingInterface tajoMasterService = client.getStub();
- 
-         ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setName(tableName);
-         builder.setSchema(schema.getProto());
-         builder.setMeta(meta.getProto());
-         builder.setPath(path.toString());
-         if (partitionMethodDesc != null) {
-           builder.setPartition(partitionMethodDesc.getProto());
-         }
-         ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
-         if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return CatalogUtil.newTableDesc(res.getTableDesc());
-         } else {
-           throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-         }
-       }
- 
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     BlockingInterface tajoMasterService = client.getStub();
+ 
+     ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setName(tableName);
+     builder.setSchema(schema.getProto());
+     builder.setMeta(meta.getProto());
+     builder.setPath(path.toString());
+     if (partitionMethodDesc != null) {
+       builder.setPartition(partitionMethodDesc.getProto());
+     }
+     ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
 -    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return CatalogUtil.newTableDesc(res.getTableDesc());
+     } else {
 -      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++      throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+     }
++
++//<<<<<<< HEAD
++//    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++//      }
++//
++//    }.withRetries();
++//=======
++//    NettyClientBase client = connection.getTajoMasterConnection();
++//    connection.checkSessionAndGet(client);
++//    BlockingInterface tajoMasterService = client.getStub();
++//
++//    ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
++//    builder.setSessionId(connection.sessionId);
++//    builder.setName(tableName);
++//    builder.setSchema(schema.getProto());
++//    builder.setMeta(meta.getProto());
++//    builder.setPath(path.toString());
++//    if (partitionMethodDesc != null) {
++//      builder.setPartition(partitionMethodDesc.getProto());
++//    }
++//    ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
++//    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++//      return CatalogUtil.newTableDesc(res.getTableDesc());
++//    } else {
++//      throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++//    }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    @Override
@@@ -211,169 -156,37 +188,222 @@@
  
    @Override
    public TableDesc getTableDesc(final String tableName) throws ServiceException {
- 
-     return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
- 
-         connection.checkSessionAndGet(client);
-         BlockingInterface tajoMasterService = client.getStub();
- 
-         ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setTableName(tableName);
-         ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
-         if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return CatalogUtil.newTableDesc(res.getTableDesc());
-         } else {
-           throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-         }
-       }
--
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     BlockingInterface tajoMasterService = client.getStub();
+ 
+     ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setTableName(tableName);
+     ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
 -    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return CatalogUtil.newTableDesc(res.getTableDesc());
+     } else {
 -      throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++      throw new ServiceException(new SQLException(res.getResult().getErrorMessage(),
++          SQLStates.ER_NO_SUCH_TABLE.getState()));
+     }
++
++//<<<<<<< HEAD
++//    return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++//      }
++//
++//    }.withRetries();
++//=======
++//    NettyClientBase client = connection.getTajoMasterConnection();
++//    connection.checkSessionAndGet(client);
++//    BlockingInterface tajoMasterService = client.getStub();
++//
++//    ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
++//    builder.setSessionId(connection.sessionId);
++//    builder.setTableName(tableName);
++//    ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
++//    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++//      return CatalogUtil.newTableDesc(res.getTableDesc());
++//    } else {
++//      throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++//    }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    @Override
    public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
- 
-     return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
- 
-         connection.checkSessionAndGet(client);
-         BlockingInterface tajoMasterService = client.getStub();
- 
-         String paramFunctionName = functionName == null ? "" : functionName;
-         ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
-             connection.convertSessionedString(paramFunctionName));
-         if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return res.getFunctionsList();
-         } else {
-           throw new SQLException(res.getResult().getErrorMessage());
-         }
-       }
--
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     BlockingInterface tajoMasterService = client.getStub();
+ 
+     String paramFunctionName = functionName == null ? "" : functionName;
+     ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+         connection.convertSessionedString(paramFunctionName));
 -    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return res.getFunctionsList();
+     } else {
 -      throw new ServiceException(new SQLException(res.getErrorMessage()));
++      throw new ServiceException(res.getResult().getErrorMessage());
++    }
++
++//<<<<<<< HEAD
++//    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++//      }
++//
++//    }.withRetries();
++//=======
++//    NettyClientBase client = connection.getTajoMasterConnection();
++//    connection.checkSessionAndGet(client);
++//    BlockingInterface tajoMasterService = client.getStub();
++//
++//    String paramFunctionName = functionName == null ? "" : functionName;
++//    ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
++//        connection.convertSessionedString(paramFunctionName));
++//    if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++//      return res.getFunctionsList();
++//    } else {
++//      throw new ServiceException(new SQLException(res.getErrorMessage()));
++//    }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
 +  }
 +
 +  @Override
 +  public IndexDescProto getIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public IndexDescProto call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.getIndexWithName(null,
-             connection.convertSessionedString(indexName));
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.getIndexWithName(null,
++        connection.convertSessionedString(indexName));
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.existIndexWithName(null,
-             connection.convertSessionedString(indexName)).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.existIndexWithName(null,
++        connection.convertSessionedString(indexName)).getValue();
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException {
-     return new ServerCallable<List<IndexDescProto>>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public List<IndexDescProto> call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
-             connection.convertSessionedString(tableName));
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           return response.getIndexesList();
-         } else {
-           throw new SQLException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
++        connection.convertSessionedString(tableName));
++    if (response.getResult().getResultCode() == ResultCode.OK) {
++      return response.getIndexesList();
++    } else {
++      throw new ServiceException(response.getResult().getErrorMessage());
++    }
++//    return new ServerCallable<List<IndexDescProto>>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public List<IndexDescProto> call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean hasIndexes(final String tableName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.existIndexesForTable(null,
-             connection.convertSessionedString(tableName)).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.existIndexesForTable(null,
++        connection.convertSessionedString(tableName)).getValue();
++
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException {
-     return new ServerCallable<IndexDescProto>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public IndexDescProto call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setTableName(tableName);
-         for (String eachColumnName : columnNames) {
-           builder.addColumnNames(eachColumnName);
-         }
-         GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           return response.getIndexDesc();
-         } else {
-           throw new SQLException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++    builder.setSessionId(connection.sessionId);
++    builder.setTableName(tableName);
++    for (String eachColumnName : columnNames) {
++      builder.addColumnNames(eachColumnName);
++    }
++    GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
++    if (response.getResult().getResultCode() == ResultCode.OK) {
++      return response.getIndexDesc();
++    } else {
++      throw new ServiceException(response.getResult().getErrorMessage());
+     }
++
++//    return new ServerCallable<IndexDescProto>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public IndexDescProto call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setTableName(tableName);
-         for (String eachColumnName : columnName) {
-           builder.addColumnNames(eachColumnName);
-         }
-         return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++    builder.setSessionId(connection.sessionId);
++    builder.setTableName(tableName);
++    for (String eachColumnName : columnName) {
++      builder.addColumnNames(eachColumnName);
++    }
++    return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
++
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
 +  }
 +
 +  @Override
 +  public boolean dropIndex(final String indexName) throws ServiceException {
-     return new ServerCallable<Boolean>(connection.manager,
-         connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       @Override
-       public Boolean call(NettyClientBase client) throws Exception {
-         BlockingInterface tajoMasterService = client.getStub();
-         return tajoMasterService.dropIndex(null,
-             connection.convertSessionedString(indexName)).getValue();
-       }
-     }.withRetries();
++    NettyClientBase client = connection.getTajoMasterConnection();
++    connection.checkSessionAndGet(client);
++    BlockingInterface tajoMasterService = client.getStub();
++    return tajoMasterService.dropIndex(null,
++        connection.convertSessionedString(indexName)).getValue();
++
++//    return new ServerCallable<Boolean>(connection.manager,
++//        connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++//      @Override
++//      public Boolean call(NettyClientBase client) throws Exception {
++//
++//      }
++//    }.withRetries();
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 73abc4c,53889fe..007c010
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@@ -153,28 -153,21 +153,40 @@@ public class QueryClientImpl implement
  
    @Override
    public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
- 
-     return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
- 
-         connection.checkSessionAndGet(client);
- 
-         final QueryRequest.Builder builder = QueryRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQuery(sql);
-         builder.setIsJson(false);
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
- 
-         SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-         }
-         return response;
-       }
-     }.withRetries();
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+ 
+     final QueryRequest.Builder builder = QueryRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQuery(sql);
+     builder.setIsJson(false);
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
 -
 -
+     SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
 -    if (response.getResultCode() == ResultCode.OK) {
++    if (response.getResult().getResultCode() == ResultCode.OK) {
+       connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+     }
+     return response;
++
++//<<<<<<< HEAD
++//        connection.checkSessionAndGet(client);
++//
++//        final QueryRequest.Builder builder = QueryRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQuery(sql);
++//        builder.setIsJson(false);
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//
++//
++//      }
++//    }.withRetries();
++//=======
++//    SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
++//    if (response.getResultCode() == ResultCode.OK) {
++//      connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//    }
++//    return response;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    @Override
@@@ -369,42 -356,26 +375,60 @@@
        throws ServiceException {
  
      try {
-       final ServerCallable<ClientProtos.SerializedResultSet> callable =
-           new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
-               TajoMasterClientProtocol.class, false) {
- 
-             public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
- 
-               connection.checkSessionAndGet(client);
-               TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
-               GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
-               builder.setSessionId(connection.sessionId);
-               builder.setQueryId(queryId.getProto());
-               builder.setFetchRowNum(fetchRowNum);
-               try {
-                 GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
-                 if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
-                   abort();
-                   throw new ServiceException(response.getResult().getErrorMessage());
-                 }
- 
-                 return response.getResultSet();
-               } catch (ServiceException e) {
-                 abort();
-                 throw e;
-               } catch (Throwable t) {
-                 throw new ServiceException(t.getMessage(), t);
-               }
-             }
-           };
- 
-       ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//<<<<<<< HEAD
++//      final ServerCallable<ClientProtos.SerializedResultSet> callable =
++//          new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
++//              TajoMasterClientProtocol.class, false) {
++//
++//            public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
++//
++//              connection.checkSessionAndGet(client);
++//              TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//              GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
++//              builder.setSessionId(connection.sessionId);
++//              builder.setQueryId(queryId.getProto());
++//              builder.setFetchRowNum(fetchRowNum);
++//              try {
++//                GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
++//                if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++//                  abort();
++//                  throw new ServiceException(response.getResult().getErrorMessage());
++//                }
++//
++//                return response.getResultSet();
++//              } catch (ServiceException e) {
++//                abort();
++//                throw e;
++//              } catch (Throwable t) {
++//                throw new ServiceException(t.getMessage(), t);
++//              }
++//            }
++//          };
++//
++//      ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//=======
+       NettyClientBase client = connection.getTajoMasterConnection();
+       connection.checkSessionAndGet(client);
+       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ 
+       GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+       builder.setSessionId(connection.sessionId);
+       builder.setQueryId(queryId.getProto());
+       builder.setFetchRowNum(fetchRowNum);
+ 
+       GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
 -      if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
 -        throw new ServiceException(response.getErrorMessage());
++      if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++        throw new ServiceException(response.getResult().getErrorMessage());
+       }
+ 
+       ClientProtos.SerializedResultSet resultSet = response.getResultSet();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
  
        return new TajoMemoryResultSet(queryId,
-           new Schema(serializedResultSet.getSchema()),
-           serializedResultSet.getSerializedTuplesList(),
-           serializedResultSet.getSerializedTuplesCount(),
+           new Schema(resultSet.getSchema()),
+           resultSet.getSerializedTuplesList(),
+           resultSet.getSerializedTuplesCount(),
            getClientSideSessionVars());
      } catch (ServiceException e) {
        throw e;
@@@ -416,59 -387,47 +440,92 @@@
    @Override
    public boolean updateQuery(final String sql) throws ServiceException {
  
-     return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public Boolean call(NettyClientBase client) throws ServiceException {
- 
-         connection.checkSessionAndGet(client);
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
-         QueryRequest.Builder builder = QueryRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQuery(sql);
-         builder.setIsJson(false);
-         ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- 
-         if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-           return true;
-         } else {
-           if (response.getResult().hasErrorMessage()) {
-             System.err.println("ERROR: " + response.getResult().getErrorMessage());
-           }
-           return false;
-         }
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ 
+     QueryRequest.Builder builder = QueryRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQuery(sql);
+     builder.setIsJson(false);
+     ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+ 
 -    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++//<<<<<<< HEAD
++//        connection.checkSessionAndGet(client);
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//        QueryRequest.Builder builder = QueryRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQuery(sql);
++//        builder.setIsJson(false);
++//        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++//
++//        if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++//          connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//          return true;
++//        } else {
++//          if (response.getResult().hasErrorMessage()) {
++//            System.err.println("ERROR: " + response.getResult().getErrorMessage());
++//          }
++//          return false;
++//        }
++//=======
++    if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+       return true;
+     } else {
 -      if (response.hasErrorMessage()) {
 -        LOG.error("ERROR: " + response.getErrorMessage());
++      if (response.getResult().hasErrorMessage()) {
++        LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+       return false;
+     }
    }
  
    @Override
    public boolean updateQueryWithJson(final String json) throws ServiceException {
  
-     return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public Boolean call(NettyClientBase client) throws ServiceException {
- 
-         connection.checkSessionAndGet(client);
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- 
-         QueryRequest.Builder builder = QueryRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQuery(json);
-         builder.setIsJson(true);
-         ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-         if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
-           return true;
-         } else {
-           if (response.getResult().hasErrorMessage()) {
-             System.err.println("ERROR: " + response.getResult().getErrorMessage());
-           }
-           return false;
-         }
++//<<<<<<< HEAD
++//    return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public Boolean call(NettyClientBase client) throws ServiceException {
++//
++//        connection.checkSessionAndGet(client);
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//        QueryRequest.Builder builder = QueryRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQuery(json);
++//        builder.setIsJson(true);
++//        ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++//        if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++//          return true;
++//        } else {
++//          if (response.getResult().hasErrorMessage()) {
++//            System.err.println("ERROR: " + response.getResult().getErrorMessage());
++//          }
++//          return false;
++//        }
++//=======
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ 
+     QueryRequest.Builder builder = QueryRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQuery(json);
+     builder.setIsJson(true);
+     ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
 -    if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++    if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+       return true;
+     } else {
 -      if (response.hasErrorMessage()) {
 -        LOG.error("ERROR: " + response.getErrorMessage());
++      if (response.getResult().hasErrorMessage()) {
++        LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+       return false;
+     }
    }
  
    @Override
@@@ -581,25 -519,20 +617,42 @@@
    }
    
    public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
-     return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
-       public QueryInfoProto call(NettyClientBase client) throws ServiceException {
-         connection.checkSessionAndGet(client);
- 
-         QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQueryId(queryId.getProto());
- 
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
-         if (res.getResult().getResultCode() == ResultCode.OK) {
-           return res.getQueryInfo();
-         } else {
-           abort();
-           throw new ServiceException(res.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++//<<<<<<< HEAD
++//    return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//      public QueryInfoProto call(NettyClientBase client) throws ServiceException {
++//        connection.checkSessionAndGet(client);
++//
++//        QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
++//        builder.setSessionId(connection.sessionId);
++//        builder.setQueryId(queryId.getProto());
++//
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//        GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
++//        if (res.getResult().getResultCode() == ResultCode.OK) {
++//          return res.getQueryInfo();
++//        } else {
++//          abort();
++//          throw new ServiceException(res.getResult().getErrorMessage());
++//        }
++//      }
++//    }.withRetries();
++//=======
+     NettyClientBase client = connection.getTajoMasterConnection();
+     connection.checkSessionAndGet(client);
+ 
+     QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+     builder.setSessionId(connection.sessionId);
+     builder.setQueryId(queryId.getProto());
+ 
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
 -    if (res.getResultCode() == ResultCode.OK) {
++    if (res.getResult().getResultCode() == ResultCode.OK) {
+       return res.getQueryInfo();
+     } else {
 -      throw new ServiceException(res.getErrorMessage());
++      throw new ServiceException(res.getResult().getErrorMessage());
+     }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@@ -611,24 -544,31 +664,42 @@@
      InetSocketAddress qmAddress = new InetSocketAddress(
          queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
  
-     return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
-         QueryMasterClientProtocol.class, false) {
-       public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
-         connection.checkSessionAndGet(client);
+     RpcClientManager manager = RpcClientManager.getInstance();
+     NettyClientBase queryMasterClient;
+     try {
+       queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+           manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+     } catch (Exception e) {
+       throw new ServiceException(e);
+     }
  
-         QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-         builder.setSessionId(connection.sessionId);
-         builder.setQueryId(queryId.getProto());
+     try {
+       connection.checkSessionAndGet(connection.getTajoMasterConnection());
+ 
++//<<<<<<< HEAD
++//        QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
++//        GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
++//        if (res.getResult().getResultCode() == ResultCode.OK) {
++//          return res.getQueryHistory();
++//        } else {
++//          abort();
++//          throw new ServiceException(res.getResult().getErrorMessage());
++//        }
++//=======
+       QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+       builder.setSessionId(connection.sessionId);
+       builder.setQueryId(queryId.getProto());
  
-         QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
-         GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
-         if (res.getResult().getResultCode() == ResultCode.OK) {
-           return res.getQueryHistory();
-         } else {
-           abort();
-           throw new ServiceException(res.getResult().getErrorMessage());
-         }
+       QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+       GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
 -      if (res.getResultCode() == ResultCode.OK) {
++      if (res.getResult().getResultCode() == ResultCode.OK) {
+         return res.getQueryHistory();
+       } else {
 -        throw new ServiceException(res.getErrorMessage());
++        throw new ServiceException(res.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+     } finally {
+       queryMasterClient.close();
+     }
    }
  }

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index bb15981,84decd5..1bb0e16
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@@ -157,52 -160,43 +160,81 @@@ public class SessionConnection implemen
    }
  
    public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException {
-     return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
-         TajoMasterClientProtocol.class, false) {
- 
-       public Map<String, String> call(NettyClientBase client) throws ServiceException {
-         checkSessionAndGet(client);
- 
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         KeyValueSet keyValueSet = new KeyValueSet();
-         keyValueSet.putAll(variables);
-         ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-             .setSessionId(sessionId)
-             .setSessionVars(keyValueSet.getProto()).build();
- 
-         SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
- 
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-           return Collections.unmodifiableMap(sessionVarsCache);
-         } else {
-           throw new ServiceException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
++//<<<<<<< HEAD
++//    return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
++//        TajoMasterClientProtocol.class, false) {
++//
++//      public Map<String, String> call(NettyClientBase client) throws ServiceException {
++//        checkSessionAndGet(client);
++//
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//        KeyValueSet keyValueSet = new KeyValueSet();
++//        keyValueSet.putAll(variables);
++//        ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++//            .setSessionId(sessionId)
++//            .setSessionVars(keyValueSet.getProto()).build();
++//
++//        SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
++//
++//        if (response.getResult().getResultCode() == ResultCode.OK) {
++//          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//          return Collections.unmodifiableMap(sessionVarsCache);
++//        } else {
++//          throw new ServiceException(response.getResult().getErrorMessage());
++//        }
++//      }
++//    }.withRetries();
++//  }
++//=======
+     NettyClientBase client = getTajoMasterConnection();
+     checkSessionAndGet(client);
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
+ 
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     KeyValueSet keyValueSet = new KeyValueSet();
+     keyValueSet.putAll(variables);
+     ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+         .setSessionId(sessionId)
+         .setSessionVars(keyValueSet.getProto()).build();
+ 
+     SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+ 
 -    if (response.getResultCode() == ResultCode.OK) {
++    if (response.getResult().getResultCode() == ResultCode.OK) {
+       updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+       return Collections.unmodifiableMap(sessionVarsCache);
+     } else {
 -      throw new ServiceException(response.getMessage());
++      throw new ServiceException(response.getResult().getErrorMessage());
+     }
    }
  
-   public Map<String, String> unsetSessionVariables(final List<String> variables)  throws ServiceException {
-     return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       public Map<String, String> call(NettyClientBase client) throws ServiceException {
-         checkSessionAndGet(client);
- 
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-             .setSessionId(sessionId)
-             .addAllUnsetVariables(variables).build();
- 
-         SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
- 
-         if (response.getResult().getResultCode() == ResultCode.OK) {
-           updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-           return Collections.unmodifiableMap(sessionVarsCache);
-         } else {
-           throw new ServiceException(response.getResult().getErrorMessage());
-         }
-       }
-     }.withRetries();
+   public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
+     NettyClientBase client = getTajoMasterConnection();
+     checkSessionAndGet(client);
+ 
++//<<<<<<< HEAD
++//        if (response.getResult().getResultCode() == ResultCode.OK) {
++//          updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++//          return Collections.unmodifiableMap(sessionVarsCache);
++//        } else {
++//          throw new ServiceException(response.getResult().getErrorMessage());
++//        }
++//      }
++//    }.withRetries();
++//=======
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+         .setSessionId(sessionId)
+         .addAllUnsetVariables(variables).build();
+ 
+     SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+ 
 -    if (response.getResultCode() == ResultCode.OK) {
++    if (response.getResult().getResultCode() == ResultCode.OK) {
+       updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+       return Collections.unmodifiableMap(sessionVarsCache);
+     } else {
 -      throw new ServiceException(response.getMessage());
++      throw new ServiceException(response.getResult().getErrorMessage());
+     }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
    }
  
    void updateSessionVarsCache(Map<String, String> variables) {
@@@ -333,55 -308,51 +346,81 @@@
    }
  
    public boolean reconnect() throws Exception {
-     return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
- 
-       public Boolean call(NettyClientBase client) throws ServiceException {
-         CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
-         builder.setUsername(userInfo.getUserName()).build();
-         if (baseDatabase != null) {
-           builder.setBaseDatabaseName(baseDatabase);
-         }
- 
+     CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+     builder.setUsername(userInfo.getUserName()).build();
+     if (baseDatabase != null) {
+       builder.setBaseDatabaseName(baseDatabase);
+     }
  
-         // create new session
-         TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-         CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
-         if (response.getResult().getResultCode() != ResultCode.OK) {
-           return false;
-         }
+     NettyClientBase client = getTajoMasterConnection();
+ 
++//<<<<<<< HEAD
++//        // create new session
++//        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//        CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
++//        if (response.getResult().getResultCode() != ResultCode.OK) {
++//          return false;
++//        }
++//=======
+     // create new session
+     TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+     CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
 -    if (response.getResultCode() != ResultCode.OK) {
++    if (response.getResult().getResultCode() != ResultCode.OK) {
+       return false;
+     }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
  
-         // Invalidate some session variables in client cache
-         sessionId = response.getSessionId();
-         Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
-         synchronized (sessionVarsCache) {
-           for (SessionVars var : UPDATE_ON_RECONNECT) {
-             String value = sessionVars.get(var.keyname());
-             if (value != null) {
-               sessionVarsCache.put(var.keyname(), value);
-             }
-           }
+     // Invalidate some session variables in client cache
+     sessionId = response.getSessionId();
+     Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+     synchronized (sessionVarsCache) {
+       for (SessionVars var : UPDATE_ON_RECONNECT) {
+         String value = sessionVars.get(var.keyname());
+         if (value != null) {
+           sessionVarsCache.put(var.keyname(), value);
          }
+       }
+     }
  
-         // Update the session variables in server side
-         try {
-           KeyValueSet keyValueSet = new KeyValueSet();
-           keyValueSet.putAll(sessionVarsCache);
-           ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
-               .setSessionId(sessionId)
-               .setSessionVars(keyValueSet.getProto()).build();
- 
-           if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
-             tajoMasterService.removeSession(null, sessionId);
-             return false;
-           }
-           LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
-           return true;
-         } catch (ServiceException e) {
-           tajoMasterService.removeSession(null, sessionId);
-           return false;
-         }
++//<<<<<<< HEAD
++//        // Update the session variables in server side
++//        try {
++//          KeyValueSet keyValueSet = new KeyValueSet();
++//          keyValueSet.putAll(sessionVarsCache);
++//          ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++//              .setSessionId(sessionId)
++//              .setSessionVars(keyValueSet.getProto()).build();
++//
++//          if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
++//            tajoMasterService.removeSession(null, sessionId);
++//            return false;
++//          }
++//          LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
++//          return true;
++//        } catch (ServiceException e) {
++//          tajoMasterService.removeSession(null, sessionId);
++//          return false;
++//        }
++//=======
+     // Update the session variables in server side
+     try {
+       KeyValueSet keyValueSet = new KeyValueSet();
+       keyValueSet.putAll(sessionVarsCache);
+       ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+           .setSessionId(sessionId)
+           .setSessionVars(keyValueSet.getProto()).build();
+ 
 -      if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
++      if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
+         tajoMasterService.removeSession(null, sessionId);
+         return false;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
        }
-     }.withRetries();
+       LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+       return true;
+     } catch (ServiceException e) {
+       tajoMasterService.removeSession(null, sessionId);
+       return false;
+     }
    }
  
    /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index be1cdf2,b1a27fa..7ad658f
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -157,24 -153,55 +153,55 @@@ public class TestKillQuery 
    @Test
    public final void testIgnoreStageStateFromKilled() throws Exception {
  
-     ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
-     QueryId queryId = new QueryId(res.getQueryId());
-     cluster.waitForQuerySubmitted(queryId);
+     SQLAnalyzer analyzer = new SQLAnalyzer();
+     QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+     Session session = LocalTajoTestingUtility.createDummySession();
+     CatalogService catalog = cluster.getMaster().getCatalog();
+ 
+     LogicalPlanner planner = new LogicalPlanner(catalog);
 -    LogicalOptimizer optimizer = new LogicalOptimizer(conf);
++    LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+     Expr expr =  analyzer.parse(queryStr);
+     LogicalPlan plan = planner.createPlan(defaultContext, expr);
+ 
+     optimizer.optimize(plan);
+ 
+     QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+     QueryContext queryContext = new QueryContext(conf);
+     MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+     GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+     globalPlanner.build(masterPlan);
  
-     QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
-     Query query = qmt.getQuery();
+     CountDownLatch barrier  = new CountDownLatch(1);
+     MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+ 
+     QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+     QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+         queryId, session, defaultContext, expr.toJson(), dispatch);
  
-     // wait for a stage created
-     cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
-     query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+     queryMasterTask.init(conf);
+     queryMasterTask.getQueryTaskContext().getDispatcher().start();
+     queryMasterTask.startQuery();
  
      try{
-       cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
-     } finally {
-       assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState());
+       barrier.await(5000, TimeUnit.MILLISECONDS);
+     } catch (InterruptedException e) {
+       fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+     }
+ 
+     Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+     assertNotNull(stage);
+ 
+     // fire kill event
+     queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+ 
+     try {
+       cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+       assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+     }   finally {
+       queryMasterTask.stop();
      }
  
-     List<Stage> stages = Lists.newArrayList(query.getStages());
+     List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
      Stage lastStage = stages.get(stages.size() - 1);
  
      assertEquals(StageState.KILLED, lastStage.getSynchronizedState());

http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------