You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/29 06:50:08 UTC

[2/6] TAJO-1140: Separate TajoClient into fine grained parts.

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 82bd855..376f63f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -18,1031 +18,10 @@
 
 package org.apache.tajo.client;
 
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.annotation.ThreadSafe;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.PartitionMethodProto;
-import org.apache.tajo.cli.InvalidClientSessionException;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.ClientProtos.*;
-import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
-import org.apache.tajo.jdbc.FetchResultSet;
-import org.apache.tajo.jdbc.SQLStates;
-import org.apache.tajo.jdbc.TajoMemoryResultSet;
-import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.ServerCallable;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
-import org.apache.tajo.util.HAServiceUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.NetUtils;
 
 import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 
 @ThreadSafe
-public class TajoClient implements Closeable {
-  public static final int UNKNOWN_ROW_NUMBER = -1;
-
-  private final Log LOG = LogFactory.getLog(TajoClient.class);
-
-  private final TajoConf conf;
-
-  private final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>();
-
-  private final InetSocketAddress tajoMasterAddr;
-
-  private final RpcConnectionPool connPool;
-
-  private final String baseDatabase;
-
-  private final UserGroupInformation userInfo;
-
-  private volatile TajoIdProtos.SessionIdProto sessionId;
-
-  private AtomicBoolean closed = new AtomicBoolean(false);
-
-  public TajoClient(TajoConf conf) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
-  }
-
-  public TajoClient(TajoConf conf, @Nullable String baseDatabase) throws IOException {
-    this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
-  }
-
-  /**
-   * Connect to TajoMaster
-   *
-   * @param conf TajoConf
-   * @param addr TajoMaster address
-   * @param baseDatabase The base database name. It is case sensitive. If it is null,
-   *                     the 'default' database will be used.
-   * @throws IOException
-   */
-  public TajoClient(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
-    this.conf = conf;
-    this.conf.set("tajo.disk.scheduler.report.interval", "0");
-    this.tajoMasterAddr = addr;
-    int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
-    // Don't share connection pool per client
-    connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
-    userInfo = UserGroupInformation.getCurrentUser();
-    this.baseDatabase = baseDatabase != null ? baseDatabase : null;
-  }
-
-  public void setSessionId(TajoIdProtos.SessionIdProto sessionId) {
-      this.sessionId = sessionId;
-  }
-
-  public boolean isConnected() {
-    if(!closed.get()){
-      try {
-        return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
-      } catch (Throwable e) {
-        return false;
-      }
-    }
-    return false;
-  }
-
-  public TajoClient(InetSocketAddress addr) throws IOException {
-    this(new TajoConf(), addr, null);
-  }
-
-  public TajoClient(String hostname, int port, String baseDatabase) throws IOException {
-    this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase);
-  }
-
-  public TajoIdProtos.SessionIdProto getSessionId() {
-    return sessionId;
-  }
-
-  private InetSocketAddress getTajoMasterAddr() {
-    if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
-      return tajoMasterAddr;
-    } else {
-      if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
-        return HAServiceUtil.getMasterClientAddress(conf);
-      } else {
-        return tajoMasterAddr;
-      }
-    }
-  }
-
-  public String getBaseDatabase() {
-    return baseDatabase;
-  }
-
-  @Override
-  public void close() {
-    if(closed.getAndSet(true)){
-      return;
-    }
-
-    // remove session
-    try {
-      NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
-      TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
-      tajoMaster.removeSession(null, sessionId);
-    } catch (Throwable e) {
-    }
-
-    if(connPool != null) {
-      connPool.shutdown();
-    }
-    queryMasterMap.clear();
-  }
-
-  public TajoConf getConf() {
-    return conf;
-  }
-
-  public UserGroupInformation getUserInfo() {
-    return userInfo;
-  }
-
-  /**
-   * Call to QueryMaster closing query resources
-   * @param queryId
-   */
-  public void closeQuery(final QueryId queryId) {
-    if(queryMasterMap.containsKey(queryId)) {
-      NettyClientBase qmClient = null;
-      try {
-        qmClient = connPool.getConnection(queryMasterMap.get(queryId), QueryMasterClientProtocol.class, false);
-        QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
-        queryMasterService.closeQuery(null, queryId.getProto());
-      } catch (Exception e) {
-        LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
-      } finally {
-        connPool.closeConnection(qmClient);
-        queryMasterMap.remove(queryId);
-      }
-    }
-  }
-
-  public void closeNonForwardQuery(final QueryId queryId) {
-    NettyClientBase tmClient = null;
-    try {
-      tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
-      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
-      checkSessionAndGet(tmClient);
-
-      QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-
-      builder.setSessionId(getSessionId());
-      builder.setQueryId(queryId.getProto());
-      tajoMasterService.closeNonForwardQuery(null, builder.build());
-    } catch (Exception e) {
-      LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
-    } finally {
-      connPool.closeConnection(tmClient);
-    }
-  }
-
-  private void checkSessionAndGet(NettyClientBase client) throws ServiceException {
-    if (sessionId == null) {
-      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-      CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
-      builder.setUsername(userInfo.getUserName()).build();
-      if (baseDatabase != null) {
-        builder.setBaseDatabaseName(baseDatabase);
-      }
-      CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
-      if (response.getState() == CreateSessionResponse.ResultState.SUCCESS) {
-        sessionId = response.getSessionId();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
-        }
-      } else {
-        throw new InvalidClientSessionException(response.getMessage());
-      }
-    }
-  }
-
-  private SessionedStringProto convertSessionedString(String str) {
-    SessionedStringProto.Builder builder = SessionedStringProto.newBuilder();
-    builder.setSessionId(sessionId);
-    builder.setValue(str);
-    return builder.build();
-  }
-
-  public String getCurrentDatabase() throws ServiceException {
-    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
-      public String call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
-      }
-    }.withRetries();
-  }
-
-  public Boolean selectDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
-      }
-    }.withRetries();
-  }
-
-  public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        KeyValueSet keyValueSet = new KeyValueSet();
-        keyValueSet.putAll(variables);
-        UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder()
-            .setSessionId(sessionId)
-            .setSetVariables(keyValueSet.getProto()).build();
-
-        return tajoMasterService.updateSessionVariables(null, request).getValue();
-      }
-    }.withRetries();
-  }
-
-  public Boolean unsetSessionVariables(final List<String> variables)  throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder()
-            .setSessionId(sessionId)
-            .addAllUnsetVariables(variables).build();
-        return tajoMasterService.updateSessionVariables(null, request).getValue();
-      }
-    }.withRetries();
-  }
-
-  public String getSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
-      public String call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
-      }
-    }.withRetries();
-  }
-
-  public Boolean existSessionVariable(final String varname) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
-      }
-    }.withRetries();
-  }
-
-  public Map<String, String> getAllSessionVariables() throws ServiceException {
-    return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class,
-        false, true) {
-
-      public Map<String, String> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        KeyValueSet keyValueSet = new KeyValueSet(tajoMasterService.getAllSessionVariables(null, sessionId));
-        return keyValueSet.getAllKeyValus();
-      }
-    }.withRetries();
-  }
-
-  /**
-   * It submits a query statement and get a response immediately.
-   * The response only contains a query id, and submission status.
-   * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}
-   * or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
-   */
-  public SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
-    return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        final QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        builder.setQuery(sql);
-        builder.setIsJson(false);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.submitQuery(null, builder.build());
-      }
-    }.withRetries();
-  }
-
-  public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
-    return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        final QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        builder.setQuery(json);
-        builder.setIsJson(true);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.submitQuery(null, builder.build());
-      }
-    }.withRetries();
-  }
-
-  /**
-   * It submits a query statement and get a response.
-   * The main difference from {@link #executeQuery(String)}
-   * is a blocking method. So, this method is wait for
-   * the finish of the submitted query.
-   *
-   * @return If failed, return null.
-   */
-  public ResultSet executeQueryAndGetResult(final String sql)
-      throws ServiceException, IOException {
-    SubmitQueryResponse response = executeQuery(sql);
-
-    if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
-      throw new ServiceException(response.getErrorTrace());
-    }
-    QueryId queryId = new QueryId(response.getQueryId());
-    if (response.getIsForwarded()) {
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-        return this.createNullResultSet(queryId);
-      } else {
-        return this.getQueryResultAndWait(queryId);
-      }
-    } else {
-      // If a non-forwarded insert into query
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() == 0) {
-        return this.createNullResultSet(queryId);
-      } else {
-        if (response.hasResultSet() || response.hasTableDesc()) {
-          return createResultSet(this, response);
-        } else {
-          return this.createNullResultSet(queryId);
-        }
-      }
-    }
-  }
-
-  public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException {
-    SubmitQueryResponse response = executeQueryWithJson(json);
-    if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
-      throw new ServiceException(response.getErrorTrace());
-    }
-    QueryId queryId = new QueryId(response.getQueryId());
-    if (response.getIsForwarded()) {
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-        return this.createNullResultSet(queryId);
-      } else {
-        return this.getQueryResultAndWait(queryId);
-      }
-    } else {
-      if (response.hasResultSet() || response.hasTableDesc()) {
-        return createResultSet(this, response);
-      } else {
-        return this.createNullResultSet(queryId);
-      }
-    }
-  }
-
-  public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException {
-    GetQueryStatusRequest.Builder builder = GetQueryStatusRequest.newBuilder();
-    builder.setQueryId(queryId.getProto());
-
-    GetQueryStatusResponse res = null;
-    if(queryMasterMap.containsKey(queryId)) {
-      NettyClientBase qmClient = null;
-      try {
-        qmClient = connPool.getConnection(queryMasterMap.get(queryId),
-            QueryMasterClientProtocol.class, false);
-        QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
-        res = queryMasterService.getQueryStatus(null, builder.build());
-      } catch (Exception e) {
-        throw new ServiceException(e.getMessage(), e);
-      } finally {
-        connPool.releaseConnection(qmClient);
-      }
-    } else {
-      NettyClientBase tmClient = null;
-      try {
-        tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
-
-        checkSessionAndGet(tmClient);
-        builder.setSessionId(sessionId);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
-        res = tajoMasterService.getQueryStatus(null, builder.build());
-
-        String queryMasterHost = res.getQueryMasterHost();
-        if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
-          NettyClientBase qmClient = null;
-          try {
-            InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort());
-            qmClient = connPool.getConnection(
-                qmAddr, QueryMasterClientProtocol.class, false);
-            QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
-            res = queryMasterService.getQueryStatus(null, builder.build());
-
-            queryMasterMap.put(queryId, qmAddr);
-          } catch (Exception e) {
-            throw new ServiceException(e.getMessage(), e);
-          } finally {
-            connPool.releaseConnection(qmClient);
-          }
-        }
-      } catch (Exception e) {
-        throw new ServiceException(e.getMessage(), e);
-      } finally {
-        connPool.releaseConnection(tmClient);
-      }
-    }
-    return new QueryStatus(res);
-  }
-
-  /* query submit */
-  public static boolean isInPreNewState(QueryState state) {
-    return state == QueryState.QUERY_NOT_ASSIGNED ||
-        state == QueryState.QUERY_MASTER_INIT ||
-        state == QueryState.QUERY_MASTER_LAUNCHED;
-  }
-
-  /* query submitted. but is not running */
-  public static boolean isInInitState(QueryState state) {
-    return  state == QueryState.QUERY_NEW || state == QueryState.QUERY_INIT;
-  }
-
-  /* query started. but is not complete */
-  public static boolean isInRunningState(QueryState state) {
-    return isInInitState(state) || state == QueryState.QUERY_RUNNING;
-  }
-
-  /* query complete */
-  public static boolean isInCompleteState(QueryState state) {
-    return !isInPreNewState(state) && !isInRunningState(state);
-  }
-
-  public ResultSet getQueryResult(QueryId queryId)
-      throws ServiceException, IOException {
-    if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-      return createNullResultSet(queryId);
-    }
-    GetQueryResultResponse response = getResultResponse(queryId);
-    TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc());
-    conf.setVar(ConfVars.USERNAME, response.getTajoUserName());
-    return new TajoResultSet(this, queryId, conf, tableDesc);
-  }
-
-  public static ResultSet createResultSet(TajoClient client, QueryId queryId, GetQueryResultResponse response)
-      throws IOException {
-    TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
-    TajoConf conf = new TajoConf(client.getConf());
-    conf.setVar(ConfVars.USERNAME, response.getTajoUserName());
-    return new TajoResultSet(client, queryId, conf, desc);
-  }
-
-  public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException {
-    if (response.hasTableDesc()) {
-      // non-forward query
-      // select * from table1 [limit 10]
-      int fetchRowNum = client.getConf().getIntVar(ConfVars.$RESULT_SET_FETCH_ROWNUM);
-      if (response.hasSessionVariables()) {
-        for (KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) {
-          if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) {
-            fetchRowNum = Integer.parseInt(eachKeyValue.getValue());
-          }
-        }
-      }
-      TableDesc tableDesc = new TableDesc(response.getTableDesc());
-      return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum);
-    } else {
-      // simple eval query
-      // select substr('abc', 1, 2)
-      SerializedResultSet serializedResultSet = response.getResultSet();
-      return new TajoMemoryResultSet(
-          new Schema(serializedResultSet.getSchema()),
-          serializedResultSet.getSerializedTuplesList(),
-          response.getMaxRowNum());
-    }
-  }
-
-  private ResultSet getQueryResultAndWait(QueryId queryId)
-      throws ServiceException, IOException {
-    if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-      return createNullResultSet(queryId);
-    }
-    QueryStatus status = getQueryStatus(queryId);
-
-    while(status != null && !isInCompleteState(status.getState())) {
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-
-      status = getQueryStatus(queryId);
-    }
-
-    if (status.getState() == QueryState.QUERY_SUCCEEDED) {
-      if (status.hasResult()) {
-        return getQueryResult(queryId);
-      } else {
-        return createNullResultSet(queryId);
-      }
-
-    } else {
-      LOG.warn("Query (" + status.getQueryId() + ") failed: " + status.getState());
-
-      //TODO throw SQLException(?)
-      return createNullResultSet(queryId);
-    }
-  }
-
-  public ResultSet createNullResultSet(QueryId queryId) throws IOException {
-    return new TajoResultSet(this, queryId);
-  }
-
-  public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException {
-    if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-      return null;
-    }
-
-    NettyClientBase client = null;
-    try {
-      InetSocketAddress queryMasterAddr = queryMasterMap.get(queryId);
-      if(queryMasterAddr == null) {
-        LOG.warn("No Connection to QueryMaster for " + queryId);
-        return null;
-      }
-      client = connPool.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false);
-      QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
-      GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
-      builder.setQueryId(queryId.getProto());
-      GetQueryResultResponse response = queryMasterService.getQueryResult(null,
-          builder.build());
-
-      return response;
-    } catch (Exception e) {
-      throw new ServiceException(e.getMessage(), e);
-    } finally {
-      connPool.releaseConnection(client);
-    }
-  }
-
-  public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException {
-    try {
-      ServerCallable<SerializedResultSet> callable =
-          new ServerCallable<SerializedResultSet>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-        public SerializedResultSet call(NettyClientBase client) throws ServiceException {
-          checkSessionAndGet(client);
-          TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-          GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
-          builder.setSessionId(sessionId);
-          builder.setQueryId(queryId.getProto());
-          builder.setFetchRowNum(fetchRowNum);
-          try {
-            GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
-            if (response.getResultCode() == ResultCode.ERROR) {
-              throw new ServiceException(response.getErrorTrace());
-            }
-
-            return response.getResultSet();
-          } catch (ServiceException e) {
-            abort();
-            throw e;
-          } catch (Throwable t) {
-            throw new ServiceException(t.getMessage(), t);
-          }
-        }
-      };
-
-      SerializedResultSet serializedResultSet = callable.withRetries();
-
-      return new TajoMemoryResultSet(
-          new Schema(serializedResultSet.getSchema()),
-          serializedResultSet.getSerializedTuplesList(),
-          serializedResultSet.getSerializedTuplesCount());
-    } catch (Exception e) {
-      throw new ServiceException(e.getMessage(), e);
-    }
-  }
-
-  public boolean updateQuery(final String sql) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        builder.setQuery(sql);
-        builder.setIsJson(false);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-        if (response.getResultCode() == ResultCode.OK) {
-          return true;
-        } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
-          }
-          return false;
-        }
-      }
-    }.withRetries();
-  }
-
-  public boolean updateQueryWithJson(final String json) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        QueryRequest.Builder builder = QueryRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        builder.setQuery(json);
-        builder.setIsJson(true);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-        if (response.getResultCode() == ResultCode.OK) {
-          return true;
-        } else {
-          if (response.hasErrorMessage()) {
-            System.err.println("ERROR: " + response.getErrorMessage());
-          }
-          return false;
-        }
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Create a database.
-   *
-   * @param databaseName The database name to be created. This name is case sensitive.
-   * @return True if created successfully.
-   * @throws ServiceException
-   */
-  public boolean createDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.createDatabase(null, convertSessionedString(databaseName)).getValue();
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Does the database exist?
-   *
-   * @param databaseName The database name to be checked. This name is case sensitive.
-   * @return True if so.
-   * @throws ServiceException
-   */
-  public boolean existDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.existDatabase(null, convertSessionedString(databaseName)).getValue();
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Drop the database
-   *
-   * @param databaseName The database name to be dropped. This name is case sensitive.
-   * @return True if the database is dropped successfully.
-   * @throws ServiceException
-   */
-  public boolean dropDatabase(final String databaseName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.dropDatabase(null, convertSessionedString(databaseName)).getValue();
-      }
-    }.withRetries();
-  }
-
-  public List<String> getAllDatabaseNames() throws ServiceException {
-    return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-      public List<String> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.getAllDatabases(null, sessionId).getValuesList();
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Does the table exist?
-   *
-   * @param tableName The table name to be checked. This name is case sensitive.
-   * @return True if so.
-   */
-  public boolean existTable(final String tableName) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        return tajoMasterService.existTable(null, convertSessionedString(tableName)).getValue();
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Create an external table.
-   *
-   * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
-   *                  If the table name is not qualified, the current database in the session will be used.
-   * @param schema The schema
-   * @param path The external table location
-   * @param meta Table meta
-   * @return the created table description.
-   * @throws SQLException
-   * @throws ServiceException
-   */
-  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
-                                       final TableMeta meta)
-      throws SQLException, ServiceException {
-    return createExternalTable(tableName, schema, path, meta, null);
-  }
-
-  /**
-   * Create an external table.
-   *
-   * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
-   *                  If the table name is not qualified, the current database in the session will be used.
-   * @param schema The schema
-   * @param path The external table location
-   * @param meta Table meta
-   * @param partitionMethodDesc Table partition description
-   * @return the created table description.
-   * @throws SQLException
-   * @throws ServiceException
-   */
-  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
-                                       final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
-      throws SQLException, ServiceException {
-    return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        builder.setName(tableName);
-        builder.setSchema(schema.getProto());
-        builder.setMeta(meta.getProto());
-        builder.setPath(path.toUri().toString());
-        if (partitionMethodDesc != null) {
-          builder.setPartition(partitionMethodDesc.getProto());
-        }
-        TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
-          return CatalogUtil.newTableDesc(res.getTableDesc());
-        } else {
-          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-        }
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Drop a table
-   *
-   * @param tableName The table name to be dropped. This name is case sensitive.
-   * @return True if the table is dropped successfully.
-   */
-  public boolean dropTable(final String tableName) throws ServiceException {
-    return dropTable(tableName, false);
-  }
-
-  /**
-   * Drop a table.
-   *
-   * @param tableName The table name to be dropped. This name is case sensitive.
-   * @param purge If purge is true, this call will remove the entry in catalog as well as the table contents.
-   * @return True if the table is dropped successfully.
-   */
-  public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
-    return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public Boolean call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        DropTableRequest.Builder builder = DropTableRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        builder.setName(tableName);
-        builder.setPurge(purge);
-        return tajoMasterService.dropTable(null, builder.build()).getValue();
-      }
-    }.withRetries();
-
-  }
-
-  public List<BriefQueryInfo> getRunningQueryList() throws ServiceException {
-    return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
-        return res.getQueryListList();
-      }
-    }.withRetries();
-  }
-
-  public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException {
-    return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
-        return res.getQueryListList();
-      }
-    }.withRetries();
-  }
-
-  public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
-    return new ServerCallable<List<WorkerResourceInfo>>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        GetClusterInfoRequest.Builder builder = GetClusterInfoRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
-        return res.getWorkerListList();
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Get a list of table names.
-   *
-   * @param databaseName The database name to show all tables. This name is case sensitive.
-   *                     If it is null, this method will show all tables
-   *                     in the current database of this session.
-   */
-  public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
-    return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public List<String> call(NettyClientBase client) throws ServiceException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        GetTableListRequest.Builder builder = GetTableListRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        if (databaseName != null) {
-          builder.setDatabaseName(databaseName);
-        }
-        GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
-        return res.getTablesList();
-      }
-    }.withRetries();
-  }
-
-  /**
-   * Get a table description
-   *
-   * @param tableName The table name to get. This name is case sensitive.
-   * @return Table description
-   */
-  public TableDesc getTableDesc(final String tableName) throws ServiceException {
-    return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-      public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-        GetTableDescRequest.Builder builder = GetTableDescRequest.newBuilder();
-        builder.setSessionId(sessionId);
-        builder.setTableName(tableName);
-        TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
-        if (res.getResultCode() == ResultCode.OK) {
-          return CatalogUtil.newTableDesc(res.getTableDesc());
-        } else {
-          throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
-        }
-      }
-    }.withRetries();
-  }
-
-  public QueryStatus killQuery(final QueryId queryId)
-      throws ServiceException, IOException {
-
-    QueryStatus status = getQueryStatus(queryId);
-
-    NettyClientBase tmClient = null;
-    try {
-      /* send a kill to the TM */
-      tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
-      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
-
-      checkSessionAndGet(tmClient);
-
-      QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-      builder.setSessionId(sessionId);
-      builder.setQueryId(queryId.getProto());
-      tajoMasterService.killQuery(null, builder.build());
-
-      long currentTimeMillis = System.currentTimeMillis();
-      long timeKillIssued = currentTimeMillis;
-      while ((currentTimeMillis < timeKillIssued + 10000L)
-          && ((status.getState() != QueryState.QUERY_KILLED)
-          || (status.getState() == QueryState.QUERY_KILL_WAIT))) {
-        try {
-          Thread.sleep(100L);
-        } catch(InterruptedException ie) {
-          break;
-        }
-        currentTimeMillis = System.currentTimeMillis();
-        status = getQueryStatus(queryId);
-      }
-
-    } catch(Exception e) {
-      LOG.debug("Error when checking for application status", e);
-    } finally {
-      connPool.releaseConnection(tmClient);
-    }
-    return status;
-  }
-
-  public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
-    return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, getTajoMasterAddr(),
-        TajoMasterClientProtocol.class, false, true) {
-      public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
-        checkSessionAndGet(client);
-
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-        String paramFunctionName = functionName == null ? "" : functionName;
-        FunctionResponse res = tajoMasterService.getFunctionList(null,convertSessionedString(paramFunctionName));
-        if (res.getResultCode() == ResultCode.OK) {
-          return res.getFunctionsList();
-        } else {
-          throw new SQLException(res.getErrorMessage());
-        }
-      }
-    }.withRetries();
-  }
+public interface TajoClient extends QueryClient, CatalogAdminClient, Closeable {
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
new file mode 100644
index 0000000..75de492
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
+import org.apache.tajo.ipc.ClientProtos.GetQueryResultResponse;
+import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+@ThreadSafe
+public class TajoClientImpl extends SessionConnection implements TajoClient, QueryClient, CatalogAdminClient {
+
+  private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
+  QueryClient queryClient;
+  CatalogAdminClient catalogClient;
+
+  public TajoClientImpl(TajoConf conf) throws IOException {
+    this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
+  }
+
+  public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
+    this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
+  }
+
+  public TajoClientImpl(InetSocketAddress addr) throws IOException {
+    this(new TajoConf(), addr, null);
+  }
+
+  /**
+   * Connect to TajoMaster
+   *
+   * @param conf TajoConf
+   * @param addr TajoMaster address
+   * @param baseDatabase The base database name. It is case sensitive. If it is null,
+   *                     the 'default' database will be used.
+   * @throws java.io.IOException
+   */
+  public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
+    super(conf, addr, baseDatabase);
+    this.queryClient = new QueryClientImpl(this);
+    this.catalogClient = new CatalogAdminClientImpl(this);
+  }
+
+  public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException {
+    super(hostName, port, baseDatabase);
+    this.queryClient = new QueryClientImpl(this);
+    this.catalogClient = new CatalogAdminClientImpl(this);
+  }
+
+  /*------------------------------------------------------------------------*/
+  // QueryClient wrappers
+  /*------------------------------------------------------------------------*/
+
+  public void closeQuery(final QueryId queryId) {
+    queryClient.closeQuery(queryId);
+  }
+
+  public void closeNonForwardQuery(final QueryId queryId) {
+    queryClient.closeNonForwardQuery(queryId);
+  }
+
+  public SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
+    return queryClient.executeQuery(sql);
+  }
+
+  public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
+    return queryClient.executeQueryWithJson(json);
+  }
+
+  public ResultSet executeQueryAndGetResult(final String sql) throws ServiceException, IOException {
+    return queryClient.executeQueryAndGetResult(sql);
+  }
+
+  public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException {
+    return queryClient.executeJsonQueryAndGetResult(json);
+  }
+
+  public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException {
+    return queryClient.getQueryStatus(queryId);
+  }
+
+  public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException {
+    return queryClient.getQueryResult(queryId);
+  }
+
+  public ResultSet createNullResultSet(QueryId queryId) throws IOException {
+    return new TajoResultSet(this, queryId);
+  }
+
+  public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException {
+    return queryClient.getResultResponse(queryId);
+  }
+
+  public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException {
+    return queryClient.fetchNextQueryResult(queryId, fetchRowNum);
+  }
+
+  public boolean updateQuery(final String sql) throws ServiceException {
+    return queryClient.updateQuery(sql);
+  }
+
+  public boolean updateQueryWithJson(final String json) throws ServiceException {
+    return queryClient.updateQueryWithJson(json);
+  }
+
+  public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException {
+    return queryClient.killQuery(queryId);
+  }
+
+  public List<BriefQueryInfo> getRunningQueryList() throws ServiceException {
+    return queryClient.getRunningQueryList();
+  }
+
+  public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException {
+    return queryClient.getFinishedQueryList();
+  }
+
+  public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
+    return queryClient.getClusterInfo();
+  }
+
+  /*------------------------------------------------------------------------*/
+  // CatalogClient wrappers
+  /*------------------------------------------------------------------------*/
+
+  public boolean createDatabase(final String databaseName) throws ServiceException {
+    return catalogClient.createDatabase(databaseName);
+  }
+
+  public boolean existDatabase(final String databaseName) throws ServiceException {
+    return catalogClient.existDatabase(databaseName);
+  }
+
+  public boolean dropDatabase(final String databaseName) throws ServiceException {
+    return catalogClient.dropDatabase(databaseName);
+  }
+
+  public List<String> getAllDatabaseNames() throws ServiceException {
+    return catalogClient.getAllDatabaseNames();
+  }
+
+  public boolean existTable(final String tableName) throws ServiceException {
+    return catalogClient.existTable(tableName);
+  }
+
+  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+                                       final TableMeta meta) throws SQLException, ServiceException {
+    return catalogClient.createExternalTable(tableName, schema, path, meta);
+  }
+
+  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+                                       final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
+      throws SQLException, ServiceException {
+    return catalogClient.createExternalTable(tableName, schema, path, meta, partitionMethodDesc);
+  }
+
+  public boolean dropTable(final String tableName) throws ServiceException {
+    return dropTable(tableName, false);
+  }
+
+  public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
+    return catalogClient.dropTable(tableName, purge);
+  }
+
+  public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
+    return catalogClient.getTableList(databaseName);
+  }
+
+  public TableDesc getTableDesc(final String tableName) throws ServiceException {
+    return catalogClient.getTableDesc(tableName);
+  }
+
+  public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
+    return catalogClient.getFunctions(functionName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
new file mode 100644
index 0000000..7aed335
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.jdbc.FetchResultSet;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+public class TajoClientUtil {
+
+  /* query submit */
+  public static boolean isQueryWaitingForSchedule(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_NOT_ASSIGNED ||
+        state == TajoProtos.QueryState.QUERY_MASTER_INIT ||
+        state == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED;
+  }
+
+  /* query submitted. but is not running */
+  public static boolean isQueryInited(TajoProtos.QueryState state) {
+    return  state == TajoProtos.QueryState.QUERY_NEW || state == TajoProtos.QueryState.QUERY_INIT;
+  }
+
+  /* query started. but is not complete */
+  public static boolean isQueryRunning(TajoProtos.QueryState state) {
+    return isQueryInited(state) || state == TajoProtos.QueryState.QUERY_RUNNING;
+  }
+
+  /* query complete */
+  public static boolean isQueryComplete(TajoProtos.QueryState state) {
+    return !isQueryWaitingForSchedule(state) && !isQueryRunning(state);
+  }
+
+  public static ResultSet createResultSet(TajoConf conf, TajoClient client, QueryId queryId,
+                                          ClientProtos.GetQueryResultResponse response)
+      throws IOException {
+    TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
+    conf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
+    return new TajoResultSet(client, queryId, conf, desc);
+  }
+
+  public static ResultSet createResultSet(TajoConf conf, QueryClient client, ClientProtos.SubmitQueryResponse response)
+      throws IOException {
+    if (response.hasTableDesc()) {
+      // non-forward query
+      // select * from table1 [limit 10]
+      int fetchRowNum = conf.getIntVar(TajoConf.ConfVars.$RESULT_SET_FETCH_ROWNUM);
+      if (response.hasSessionVariables()) {
+        for (PrimitiveProtos.KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) {
+          if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) {
+            fetchRowNum = Integer.parseInt(eachKeyValue.getValue());
+          }
+        }
+      }
+      TableDesc tableDesc = new TableDesc(response.getTableDesc());
+      return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum);
+    } else {
+      // simple eval query
+      // select substr('abc', 1, 2)
+      ClientProtos.SerializedResultSet serializedResultSet = response.getResultSet();
+      return new TajoMemoryResultSet(
+          new Schema(serializedResultSet.getSchema()),
+          serializedResultSet.getSerializedTuplesList(),
+          response.getMaxRowNum());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
index 7628d9d..540f54b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
@@ -104,9 +104,9 @@ public class TajoDump {
       System.exit(-1);
     } else if (hostName != null && port != null) {
       conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
-      client = new TajoClient(conf);
+      client = new TajoClientImpl(conf);
     } else {
-      client = new TajoClient(conf);
+      client = new TajoClientImpl(conf);
     }
 
     PrintWriter writer = new PrintWriter(System.out);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
index 2377427..88ab491 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
@@ -112,9 +112,9 @@ public class TajoGetConf {
       return;
     } else if (hostName != null && port != null) {
       tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
-      tajoClient = new TajoClient(tajoConf);
+      tajoClient = new TajoClientImpl(tajoConf);
     } else if (hostName == null && port == null) {
-      tajoClient = new TajoClient(tajoConf);
+      tajoClient = new TajoClientImpl(tajoConf);
     }
 
     processConfKey(writer, param);
@@ -123,13 +123,13 @@ public class TajoGetConf {
 
   private void processConfKey(Writer writer, String param) throws ParseException, IOException,
       ServiceException, SQLException {
-    String value = tajoClient.getConf().getTrimmed(param);
+    String value = tajoConf.getTrimmed(param);
 
     // If there is no value in the configuration file, we need to find all ConfVars.
     if (value == null) {
       for(TajoConf.ConfVars vars : TajoConf.ConfVars.values()) {
         if (vars.varname.equalsIgnoreCase(param)) {
-          value = tajoClient.getConf().getVar(vars);
+          value = tajoConf.getVar(vars);
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
index 11cb4ed..5d5cf71 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
@@ -126,9 +126,9 @@ public class TajoHAAdmin {
       return;
     } else if (hostName != null && port != null) {
       tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
-      tajoClient = new TajoClient(tajoConf);
+      tajoClient = new TajoClientImpl(tajoConf);
     } else if (hostName == null && port == null) {
-      tajoClient = new TajoClient(tajoConf);
+      tajoClient = new TajoClientImpl(tajoConf);
     }
 
     if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
@@ -159,7 +159,7 @@ public class TajoHAAdmin {
   private void getState(Writer writer, String param) throws ParseException, IOException,
       ServiceException {
     tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
-    int retValue = HAServiceUtil.getState(param, tajoClient.getConf());
+    int retValue = HAServiceUtil.getState(param, tajoConf);
 
     switch (retValue) {
       case 1:
@@ -179,7 +179,7 @@ public class TajoHAAdmin {
 
   private void formatHA(Writer writer) throws ParseException, IOException,
       ServiceException {
-    int retValue = HAServiceUtil.formatHA(tajoClient.getConf());
+    int retValue = HAServiceUtil.formatHA(tajoConf);
 
     switch (retValue) {
       case 1:

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
index f22d5ba..b93590c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -71,7 +71,7 @@ public class TajoHAClientUtil {
         conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
             HAServiceUtil.getMasterClientName(conf));
         client.close();
-        tajoClient = new TajoClient(conf, baseDatabase);
+        tajoClient = new TajoClientImpl(conf, baseDatabase);
 
         if (context != null && context.getCurrentDatabase() != null) {
           tajoClient.selectDatabase(context.getCurrentDatabase());

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
index 7ebce91..78674b1 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -20,6 +20,7 @@ package org.apache.tajo.jdbc;
 
 import org.apache.tajo.QueryId;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.storage.Tuple;
 
@@ -27,13 +28,13 @@ import java.io.IOException;
 import java.sql.SQLException;
 
 public class FetchResultSet extends TajoResultSetBase {
-  private TajoClient tajoClient;
+  private QueryClient tajoClient;
   private QueryId queryId;
   private int fetchRowNum;
   private TajoMemoryResultSet currentResultSet;
   private boolean finished = false;
 
-  public FetchResultSet(TajoClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
+  public FetchResultSet(QueryClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
     this.tajoClient = tajoClient;
     this.queryId = queryId;
     this.fetchRowNum = fetchRowNum;

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
index 65954f1..d78b04f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.FileScanner;
@@ -47,20 +48,20 @@ public class TajoResultSet extends TajoResultSetBase {
 
   private FileSystem fs;
   private Scanner scanner;
-  private TajoClient tajoClient;
+  private QueryClient tajoClient;
   private TajoConf conf;
   private TableDesc desc;
   private Long maxRowNum = null;
   private QueryId queryId;
   private AtomicBoolean closed = new AtomicBoolean(false);
 
-  public TajoResultSet(TajoClient tajoClient, QueryId queryId) {
+  public TajoResultSet(QueryClient tajoClient, QueryId queryId) {
     this.tajoClient = tajoClient;
     this.queryId = queryId;
     init();
   }
 
-  public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException {
+  public TajoResultSet(QueryClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException {
     this.tajoClient = tajoClient;
     this.queryId = queryId;
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index 91dcea1..b1b6450 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.CatalogConstants;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.store.MemStore;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.util.FileUtil;
@@ -43,11 +44,11 @@ public abstract class BenchmarkSet {
   public void init(TajoConf conf, String dataDir) throws IOException {
     this.dataDir = dataDir;
     if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
-      tajo = new TajoClient(NetUtils.createSocketAddr(
+      tajo = new TajoClientImpl(NetUtils.createSocketAddr(
           System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
     } else {
       conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
-      tajo = new TajoClient(conf);
+      tajo = new TajoClientImpl(conf);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index cd3be98..f7c7b11 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -41,7 +41,7 @@ import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
@@ -761,7 +761,7 @@ public class GlobalEngine extends AbstractService {
     stats.setNumBytes(totalSize);
 
     if (isExternal) { // if it is an external table, there is no way to know the exact row number without processing.
-      stats.setNumRows(TajoClient.UNKNOWN_ROW_NUMBER);
+      stats.setNumRows(QueryClient.UNKNOWN_ROW_NUMBER);
     }
 
     TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index 7ea2e48..7790ac6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -9,13 +9,10 @@ import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.client.QueryStatus;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.client.TajoHAClientUtil;
+import org.apache.tajo.client.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.JSPUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.codehaus.jackson.map.DeserializationConfig;
@@ -67,18 +64,21 @@ public class QueryExecutorServlet extends HttpServlet {
   //queryRunnerId -> QueryRunner
   private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>();
 
+  private TajoConf tajoConf;
   private TajoClient tajoClient;
 
   private ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5);
 
   private QueryRunnerCleaner queryRunnerCleaner;
+
   @Override
   public void init(ServletConfig config) throws ServletException {
     om.getDeserializationConfig().disable(
         DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
 
     try {
-      tajoClient = new TajoClient(new TajoConf());
+      tajoConf = new TajoConf();
+      tajoClient = new TajoClientImpl(tajoConf);
 
       queryRunnerCleaner = new QueryRunnerCleaner();
       queryRunnerCleaner.start();
@@ -273,8 +273,7 @@ public class QueryExecutorServlet extends HttpServlet {
     public void run() {
       startTime = System.currentTimeMillis();
       try {
-        TajoConf conf = tajoClient.getConf();
-        tajoClient = TajoHAClientUtil.getTajoClient(conf, tajoClient);
+        tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
 
         response = tajoClient.executeQuery(query);
 
@@ -319,7 +318,7 @@ public class QueryExecutorServlet extends HttpServlet {
           // non-forwarded INSERT INTO query does not have any query id.
           // In this case, it just returns succeeded query information without printing the query results.
         } else {
-          res = TajoClient.createResultSet(tajoClient, response);
+          res = TajoClientUtil.createResultSet(tajoConf, tajoClient, response);
           MakeResultText(res, desc);
         }
         progress.set(100);
@@ -399,8 +398,8 @@ public class QueryExecutorServlet extends HttpServlet {
               try {
                 ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(tajoQueryId);
                 TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
-                tajoClient.getConf().setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
-                res = new TajoResultSet(tajoClient, queryId, tajoClient.getConf(), desc);
+                tajoConf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
+                res = new TajoResultSet(tajoClient, queryId, tajoConf, desc);
 
                 MakeResultText(res, desc);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 805fe06..c6cac32 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -31,6 +31,7 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
@@ -102,7 +103,7 @@ public class LocalTajoTestingUtility {
     util = new TajoTestingCluster();
     util.startMiniCluster(1);
     conf = util.getConfiguration();
-    client = new TajoClient(conf);
+    client = new TajoClientImpl(conf);
 
     FileSystem fs = util.getDefaultFileSystem();
     Path rootDir = util.getMaster().getStorageManager().getWarehouseDir();

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index a272b15..becb73e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.cli.ParsedResult;
 import org.apache.tajo.cli.SimpleParser;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.query.QueryContext;
@@ -178,7 +179,7 @@ public class QueryTestCaseBase {
   @BeforeClass
   public static void setUpClass() throws IOException {
     conf = testBase.getTestingCluster().getConfiguration();
-    client = new TajoClient(conf);
+    client = new TajoClientImpl(conf);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index aec11f6..452a17e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.master.TajoMaster;
@@ -583,7 +585,7 @@ public class TajoTestingCluster {
       Thread.sleep(1000);
     }
     TajoConf conf = util.getConfiguration();
-    return new TajoClient(conf);
+    return new TajoClientImpl(conf);
   }
 
   public static ResultSet run(String[] names,
@@ -620,7 +622,7 @@ public class TajoTestingCluster {
       Thread.sleep(1000);
     }
     TajoConf conf = util.getConfiguration();
-    TajoClient client = new TajoClient(conf);
+    TajoClient client = new TajoClientImpl(conf);
 
     try {
       return run(names, schemas, tableOption, tables, query, client);
@@ -645,7 +647,7 @@ public class TajoTestingCluster {
       Thread.sleep(1000);
     }
     TajoConf conf = util.getConfiguration();
-    TajoClient client = new TajoClient(conf);
+    TajoClient client = new TajoClientImpl(conf);
     try {
       FileSystem fs = util.getDefaultFileSystem();
       Path rootDir = util.getMaster().
@@ -721,7 +723,7 @@ public class TajoTestingCluster {
     QueryMasterTask qmt = null;
 
     int i = 0;
-    while (qmt == null || TajoClient.isInPreNewState(qmt.getState())) {
+    while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) {
       try {
         Thread.sleep(delay);
         if(qmt == null){

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 4ede88e..719a775 100644
--- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -62,7 +62,7 @@ public class TestTajoClient {
   public static void setUp() throws Exception {
     cluster = TpchTestBase.getInstance().getTestingCluster();
     conf = cluster.getConfiguration();
-    client = new TajoClient(conf);
+    client = new TajoClientImpl(conf);
     testDir = CommonTestingUtil.getTestDir();
   }
 
@@ -658,7 +658,7 @@ public class TestTajoClient {
 
       QueryStatus queryStatus = client.getQueryStatus(queryId);
       assertNotNull(queryStatus);
-      assertTrue(TajoClient.isInCompleteState(queryStatus.getState()));
+      assertTrue(TajoClientUtil.isQueryComplete(queryStatus.getState()));
 
       TajoResultSet resultSet = (TajoResultSet) client.getQueryResult(queryId);
       assertNotNull(resultSet);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index e477939..08535ef 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -25,7 +25,7 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.QueryClient;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -419,7 +419,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
       assertNotNull(rsmd);
       assertEquals(0, rsmd.getColumnCount());
 
-      TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient();
+      QueryClient connTajoClient = ((JdbcConnection)stmt.getConnection()).getQueryClient();
       Map<String, String> variables = connTajoClient.getAllSessionVariables();
       String value = variables.get("JOIN_TASK_INPUT_SIZE");
       assertNotNull(value);
@@ -461,7 +461,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
       assertNotNull(rsmd);
       assertEquals(0, rsmd.getColumnCount());
 
-      TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient();
+      QueryClient connTajoClient = ((JdbcConnection)stmt.getConnection()).getQueryClient();
       Map<String, String> variables = connTajoClient.getAllSessionVariables();
       String value = variables.get("JOIN_TASK_INPUT_SIZE");
       assertNotNull(value);

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
index 86d18eb..249afae 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster;
 import org.junit.Test;
@@ -55,7 +56,7 @@ public class TestHAServiceHDFSImpl  {
     cluster.startMiniCluster(1);
 
     conf = cluster.getConfiguration();
-    client = new TajoClient(conf);
+    client = new TajoClientImpl(conf);
 
     FileSystem fs = cluster.getDefaultFileSystem();
     startBackupMasters();

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
index 0f925bb..4a6ca00 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
@@ -21,6 +21,8 @@ package org.apache.tajo.master.querymaster;
 import org.apache.tajo.*;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos;
 
@@ -41,7 +43,7 @@ public class TestQueryProgress {
   public static void setUp() throws Exception {
     cluster = TpchTestBase.getInstance().getTestingCluster();
     conf = cluster.getConfiguration();
-    client = new TajoClient(conf);
+    client = new TajoClientImpl(conf);
   }
 
   @AfterClass
@@ -67,7 +69,7 @@ public class TestQueryProgress {
       prevProgress = progress;
       assertTrue(progress <= 1.0f);
 
-      if (TajoClient.isInCompleteState(status.getState())) break;
+      if (TajoClientUtil.isQueryComplete(status.getState())) break;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
index 069ee27..18764c2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
@@ -20,6 +20,8 @@ package org.apache.tajo.scheduler;
 
 import org.apache.tajo.*;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos;
 import org.junit.AfterClass;
@@ -41,7 +43,7 @@ public class TestFifoScheduler {
   public static void setUp() throws Exception {
     cluster = TpchTestBase.getInstance().getTestingCluster();
     conf = cluster.getConfiguration();
-    client = new TajoClient(conf);
+    client = new TajoClientImpl(conf);
   }
 
   @AfterClass
@@ -75,7 +77,7 @@ public class TestFifoScheduler {
     cluster.waitForQueryRunning(queryId);
 
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState());
-    ResultSet resSet = TajoClient.createResultSet(client, res2);
+    ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2);
     assertNotNull(resSet);
 
     client.killQuery(queryId); //cleanup
@@ -95,7 +97,7 @@ public class TestFifoScheduler {
 
     cluster.waitForQueryRunning(queryId);
 
-    assertTrue(TajoClient.isInRunningState(client.getQueryStatus(queryId).getState()));
+    assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState()));
 
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState());
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState());

http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
index 3a85c14..c68d3a4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.TpchTestBase;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -49,7 +50,7 @@ public class TestHistory {
     cluster = TpchTestBase.getInstance().getTestingCluster();
     master = cluster.getMaster();
     conf = cluster.getConfiguration();
-    client = new TajoClient(conf);
+    client = new TajoClientImpl(conf);
   }
 
   @After