You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/05 11:11:51 UTC

[2/3] tajo git commit: TAJO-1228: TajoClient should communicate with only TajoMaster without TajoWorker.

TAJO-1228: TajoClient should communicate with only TajoMaster without TajoWorker.

Closes #317


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cb9793b9
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cb9793b9
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cb9793b9

Branch: refs/heads/index_support
Commit: cb9793b990f1c882e3371a44e6c3f28fe913c1a7
Parents: 7a1ac28
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Jan 5 17:44:14 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Jan 5 17:44:14 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/client/QueryClientImpl.java |  99 +++------------
 .../apache/tajo/client/SessionConnection.java   |  19 +--
 tajo-client/src/main/proto/ClientProtos.proto   |  22 ++--
 .../main/proto/QueryMasterClientProtocol.proto  |   4 -
 .../tajo/master/TajoMasterClientService.java    |  18 +--
 .../master/querymaster/QueryInProgress.java     |  66 +++++-----
 .../tajo/master/querymaster/QueryInfo.java      |  41 +++++-
 .../master/querymaster/QueryJobManager.java     |   5 +
 .../tajo/master/querymaster/QueryMaster.java    |   6 +-
 .../tajo/worker/TajoWorkerClientService.java    | 127 -------------------
 .../src/main/proto/TajoMasterProtocol.proto     |   7 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  24 +---
 13 files changed, 135 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b0b1685..8a4ffe1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1228: TajoClient should communicate with only TajoMaster without 
+    TajoWorker. (hyunsik)
+
     TAJO-1176: Implements queryable virtual tables for catalog information
     (jihun)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index f923965..bab3518 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -36,7 +36,6 @@ import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.ServerCallable;
-import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
 
 import java.io.IOException;
@@ -95,19 +94,7 @@ public class QueryClientImpl implements QueryClient {
 
   @Override
   public void closeQuery(QueryId queryId) {
-    if(connection.queryMasterMap.containsKey(queryId)) {
-      NettyClientBase qmClient = null;
-      try {
-        qmClient = connection.getConnection(queryId, QueryMasterClientProtocol.class, false);
-        QueryMasterClientProtocolService.BlockingInterface queryMaster = qmClient.getStub();
-        queryMaster.closeQuery(null, queryId.getProto());
-      } catch (Exception e) {
-        LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
-      } finally {
-        connection.connPool.closeConnection(qmClient);
-        connection.queryMasterMap.remove(queryId);
-      }
-    }
+    // nothing to do
   }
 
   @Override
@@ -318,63 +305,21 @@ public class QueryClientImpl implements QueryClient {
     ClientProtos.GetQueryStatusRequest.Builder builder = ClientProtos.GetQueryStatusRequest.newBuilder();
     builder.setQueryId(queryId.getProto());
 
-    ClientProtos.GetQueryStatusResponse res = null;
-
-    if(connection.queryMasterMap.containsKey(queryId)) {
-      NettyClientBase qmClient = null;
-
-      try {
-
-        qmClient = connection.connPool.getConnection(connection.queryMasterMap.get(queryId),
-            QueryMasterClientProtocol.class, false);
-        QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
-        res = queryMasterService.getQueryStatus(null, builder.build());
-
-      } catch (Exception e) {
-        throw new ServiceException(e.getMessage(), e);
-      } finally {
-        connection.connPool.releaseConnection(qmClient);
-      }
-
-    } else {
+    GetQueryStatusResponse res = null;
 
-      NettyClientBase tmClient = null;
-
-      try {
-        tmClient = connection.getTajoMasterConnection(false);
-        connection.checkSessionAndGet(tmClient);
-        builder.setSessionId(connection.sessionId);
-        TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
-
-        res = tajoMasterService.getQueryStatus(null, builder.build());
-
-        String queryMasterHost = res.getQueryMasterHost();
-
-        if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
-          NettyClientBase qmClient = null;
-
-          try {
-
-            InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort());
-            qmClient = connection.connPool.getConnection(
-                qmAddr, QueryMasterClientProtocol.class, false);
-            QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
-            res = queryMasterService.getQueryStatus(null, builder.build());
-
-            connection.queryMasterMap.put(queryId, qmAddr);
+    NettyClientBase tmClient = null;
+    try {
+      tmClient = connection.getTajoMasterConnection(false);
+      connection.checkSessionAndGet(tmClient);
+      builder.setSessionId(connection.sessionId);
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
 
-          } catch (Exception e) {
-            throw new ServiceException(e.getMessage(), e);
-          } finally {
-            connection.connPool.releaseConnection(qmClient);
-          }
-        }
+      res = tajoMasterService.getQueryStatus(null, builder.build());
 
-      } catch (Exception e) {
-        throw new ServiceException(e.getMessage(), e);
-      } finally {
-        connection.connPool.releaseConnection(tmClient);
-      }
+    } catch (Exception e) {
+      throw new ServiceException(e.getMessage(), e);
+    } finally {
+      connection.connPool.releaseConnection(tmClient);
     }
     return new QueryStatus(res);
   }
@@ -404,29 +349,25 @@ public class QueryClientImpl implements QueryClient {
       return null;
     }
 
-    NettyClientBase client = null;
+    NettyClientBase tmClient = null;
 
     try {
 
-      InetSocketAddress queryMasterAddr = connection.queryMasterMap.get(queryId);
-      if(queryMasterAddr == null) {
-        LOG.warn("No Connection to QueryMaster for " + queryId);
-        return null;
-      }
-
-      client = connection.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false);
-      QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
+      tmClient = connection.getTajoMasterConnection(false);
+      connection.checkSessionAndGet(tmClient);
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
 
       GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
       builder.setQueryId(queryId.getProto());
-      GetQueryResultResponse response = queryMasterService.getQueryResult(null,builder.build());
+      builder.setSessionId(connection.sessionId);
+      GetQueryResultResponse response = tajoMasterService.getQueryResult(null,builder.build());
 
       return response;
 
     } catch (Exception e) {
       throw new ServiceException(e.getMessage(), e);
     } finally {
-      connection.connPool.releaseConnection(client);
+      connection.connPool.releaseConnection(tmClient);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index c849f2d..1bc8050 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,7 +21,7 @@ package org.apache.tajo.client;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.auth.UserRoleInfo;
@@ -46,7 +46,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
@@ -59,8 +58,6 @@ public class SessionConnection implements Closeable {
 
   private final TajoConf conf;
 
-  final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>();
-
   final InetSocketAddress tajoMasterAddr;
 
   final RpcConnectionPool connPool;
@@ -117,23 +114,11 @@ public class SessionConnection implements Closeable {
     return Collections.unmodifiableMap(sessionVarsCache);
   }
 
-  public <T> T getStub(QueryId queryId, Class protocolClass, boolean asyncMode) throws NoSuchMethodException,
-      ConnectTimeoutException, ClassNotFoundException {
-    InetSocketAddress addr = queryMasterMap.get(queryId);
-    return connPool.getConnection(addr, protocolClass, asyncMode).getStub();
-  }
-
   public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
       ConnectTimeoutException, ClassNotFoundException {
     return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
   }
 
-  public NettyClientBase getConnection(QueryId queryId, Class protocolClass, boolean asyncMode)
-      throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
-    InetSocketAddress addr = queryMasterMap.get(queryId);
-    return connPool.getConnection(addr, protocolClass, asyncMode);
-  }
-
   public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode)
       throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
     return connPool.getConnection(addr, protocolClass, asyncMode);
@@ -321,8 +306,6 @@ public class SessionConnection implements Closeable {
     if(connPool != null) {
       connPool.shutdown();
     }
-
-    queryMasterMap.clear();
   }
 
   protected InetSocketAddress getTajoMasterAddr() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index a741268..a9f5498 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -87,7 +87,7 @@ message GetQueryResultRequest {
 message GetQueryResultResponse {
   optional TableDescProto tableDesc = 1;
   optional string errorMessage = 2;
-  required string tajoUserName = 3;
+  optional string tajoUserName = 3;
 }
 
 message QueryIdRequest {
@@ -242,15 +242,17 @@ message FunctionResponse {
 message QueryInfoProto {
   required string queryId = 1;
   optional string sql = 2;
-  optional QueryState queryState = 3;
-  optional float progress = 4;
-  optional int64 startTime = 5;
-  optional int64 finishTime = 6;
-  optional string lastMessage = 7;
-  optional string hostNameOfQM = 8;
-  optional int32 queryMasterPort = 9;
-  optional int32 queryMasterClientPort = 10;
-  optional int32 queryMasterInfoPort = 11;
+  optional KeyValueSetProto contextVars= 3;
+  optional QueryState queryState = 4;
+  optional float progress = 5;
+  optional int64 startTime = 6;
+  optional int64 finishTime = 7;
+  optional string lastMessage = 8;
+  optional string hostNameOfQM = 9;
+  optional int32 queryMasterPort = 10;
+  optional int32 queryMasterClientPort = 11;
+  optional int32 queryMasterInfoPort = 12;
+  optional TableDescProto resultDesc = 13;
 }
 
 message StageHistoryProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
index 3d8d70b..0b11566 100644
--- a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
+++ b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
@@ -28,9 +28,5 @@ import "PrimitiveProtos.proto";
 import "ClientProtos.proto";
 
 service QueryMasterClientProtocolService {
-  rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
-  rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
-  rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
-  rpc closeQuery(QueryIdProto) returns (BoolProto);
   rpc getQueryHistory(QueryIdRequest) returns (GetQueryHistoryResponse);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index c413b65..249d335 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -345,6 +345,7 @@ public class TajoMasterClientService extends AbstractService {
         }
 
         GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder();
+        builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
 
         // If we cannot the QueryInfo instance from the finished list,
         // the query result was expired due to timeout.
@@ -354,20 +355,16 @@ public class TajoMasterClientService extends AbstractService {
           return builder.build();
         }
 
-        try {
-          //TODO After implementation Tajo's user security feature, Should be modified.
-          builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
-        } catch (IOException e) {
-          LOG.warn("Can't get current user name");
-        }
         switch (queryInfo.getQueryState()) {
           case QUERY_SUCCEEDED:
-            // TODO check this logic needed
-            //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
+            if (queryInfo.hasResultdesc()) {
+              builder.setTableDesc(queryInfo.getResultDesc().getProto());
+            }
             break;
           case QUERY_FAILED:
           case QUERY_ERROR:
             builder.setErrorMessage("Query " + queryId + " is failed");
+            break;
           default:
             builder.setErrorMessage("Query " + queryId + " is still running");
         }
@@ -479,6 +476,11 @@ public class TajoMasterClientService extends AbstractService {
           if (queryInfo != null) {
             builder.setResultCode(ResultCode.OK);
             builder.setState(queryInfo.getQueryState());
+
+            boolean isCreateTable = queryInfo.getQueryContext().isCreateTable();
+            boolean isInsert = queryInfo.getQueryContext().isInsert();
+            builder.setHasResult(!(isCreateTable || isInsert));
+
             builder.setProgress(queryInfo.getProgress());
             builder.setSubmitTime(queryInfo.getStartTime());
             if(queryInfo.getQueryMasterHost() != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index e361c7f..ca0bd72 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -24,13 +24,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -39,6 +36,7 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -57,8 +55,6 @@ public class QueryInProgress extends CompositeService {
 
   private Session session;
 
-  private QueryContext queryContext;
-
   private TajoAsyncDispatcher dispatcher;
 
   private LogicalRootNode plan;
@@ -75,8 +71,6 @@ public class QueryInProgress extends CompositeService {
 
   private QueryMasterProtocolService queryMasterRpcClient;
 
-  private ContainerProtocol.TajoContainerIdProto qmContainerId;
-
   public QueryInProgress(
       TajoMaster.MasterContext masterContext,
       Session session,
@@ -85,11 +79,10 @@ public class QueryInProgress extends CompositeService {
     super(QueryInProgress.class.getName());
     this.masterContext = masterContext;
     this.session = session;
-    this.queryContext = queryContext;
     this.queryId = queryId;
     this.plan = plan;
 
-    queryInfo = new QueryInfo(queryId, sql, jsonExpr);
+    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
     queryInfo.setStartTime(System.currentTimeMillis());
   }
 
@@ -145,7 +138,7 @@ public class QueryInProgress extends CompositeService {
     }
 
     if(queryMasterRpc != null) {
-      RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
+      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
     }
 
     masterContext.getHistoryWriter().appendHistory(queryInfo);
@@ -212,7 +205,7 @@ public class QueryInProgress extends CompositeService {
     InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
     LOG.info("Connect to QueryMaster:" + addr);
     queryMasterRpc =
-        RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);
+        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
     queryMasterRpcClient = queryMasterRpc.getStub();
   }
 
@@ -235,8 +228,8 @@ public class QueryInProgress extends CompositeService {
 
       QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
       builder.setQueryId(queryId.getProto())
+          .setQueryContext(queryInfo.getQueryContext().getProto())
           .setSession(session.getProto())
-          .setQueryContext(queryContext.getProto())
           .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
           .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
 
@@ -267,28 +260,37 @@ public class QueryInProgress extends CompositeService {
 
   private void heartbeat(QueryInfo queryInfo) {
     LOG.info("Received QueryMaster heartbeat:" + queryInfo);
-    this.queryInfo.setQueryState(queryInfo.getQueryState());
-    this.queryInfo.setProgress(queryInfo.getProgress());
-    this.queryInfo.setFinishTime(queryInfo.getFinishTime());
 
-    if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
-      this.queryInfo.setLastMessage(queryInfo.getLastMessage());
-      LOG.info(queryId + queryInfo.getLastMessage());
-    }
-    if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
-      //TODO needed QueryMaster's detail status(failed before or after launching worker)
-      //queryMasterStopped.set(true);
-      LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
-    }
+    // to avoid partial update by different heartbeats
+    synchronized (this.queryInfo) {
 
-    if(!querySubmitted.get()) {
-      getEventHandler().handle(
-          new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo));
-    }
+      // terminal state will let client to retrieve a query result
+      // So, we must set the query result before changing query state
+      if (isFinishState(queryInfo.getQueryState())) {
+        if (queryInfo.hasResultdesc()) {
+          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+        }
+      }
+
+      this.queryInfo.setQueryState(queryInfo.getQueryState());
+      this.queryInfo.setProgress(queryInfo.getProgress());
+      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
 
-    if(isFinishState(this.queryInfo.getQueryState())) {
-      getEventHandler().handle(
-          new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo));
+      // Update diagnosis message
+      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+        LOG.info(queryId + queryInfo.getLastMessage());
+      }
+
+      // if any error occurs, print outs the error message
+      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+      }
+
+
+      if (isFinishState(this.queryInfo.getQueryState())) {
+        stop();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index 00b95ac..559fc14 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -22,7 +22,9 @@ package org.apache.tajo.master.querymaster;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
 import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.util.TajoIdUtils;
@@ -31,15 +33,17 @@ import org.apache.tajo.util.history.History;
 public class QueryInfo implements GsonObject, History {
   private QueryId queryId;
   @Expose
+  private QueryContext context;
+  @Expose
   private String sql;
   @Expose
-  private TajoProtos.QueryState queryState;
+  private volatile TajoProtos.QueryState queryState;
   @Expose
-  private float progress;
+  private volatile float progress;
   @Expose
-  private long startTime;
+  private volatile long startTime;
   @Expose
-  private long finishTime;
+  private volatile  long finishTime;
   @Expose
   private String lastMessage;
   @Expose
@@ -52,18 +56,22 @@ public class QueryInfo implements GsonObject, History {
   private int queryMasterInfoPort;
   @Expose
   private String queryIdStr;
+  @Expose
+  private volatile TableDesc resultDesc;
 
   private String jsonExpr;
 
   public QueryInfo(QueryId queryId) {
-    this(queryId, null, null);
+    this(queryId, null, null, null);
   }
 
-  public QueryInfo(QueryId queryId, String sql, String jsonExpr) {
+  public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) {
     this.queryId = queryId;
     this.queryIdStr = queryId.toString();
+    this.context = queryContext;
     this.sql = sql;
     this.jsonExpr = jsonExpr;
+
     this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
   }
 
@@ -71,6 +79,10 @@ public class QueryInfo implements GsonObject, History {
     return queryId;
   }
 
+  public QueryContext getQueryContext() {
+    return context;
+  }
+
   public String getSql() {
     return sql;
   }
@@ -147,6 +159,18 @@ public class QueryInfo implements GsonObject, History {
     this.progress = progress;
   }
 
+  public void setResultDesc(TableDesc result) {
+    this.resultDesc = result;
+  }
+
+  public boolean hasResultdesc() {
+    return resultDesc != null;
+  }
+
+  public TableDesc getResultDesc() {
+    return resultDesc;
+  }
+
   @Override
   public String toString() {
     return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster="
@@ -182,6 +206,7 @@ public class QueryInfo implements GsonObject, History {
 
     builder.setQueryId(queryId.toString())
         .setQueryState(queryState)
+        .setContextVars(context.getProto())
         .setProgress(progress)
         .setStartTime(startTime)
         .setFinishTime(finishTime)
@@ -189,6 +214,10 @@ public class QueryInfo implements GsonObject, History {
         .setQueryMasterClientPort(queryMasterClientPort)
         .setQueryMasterInfoPort(queryMasterInfoPort);
 
+    if (resultDesc != null) {
+      builder.setResultDesc(resultDesc.getProto());
+    }
+
     if (sql != null) {
       builder.setSql(sql);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index ddbd3e1..34a0d01 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
@@ -300,6 +301,10 @@ public class QueryJobManager extends CompositeService {
       queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
     }
 
+    if (queryHeartbeat.hasResultDesc()) {
+      queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+    }
+
     return queryInfo;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 7ddd787..7623026 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -454,10 +454,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
     TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
 
     builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
-    builder.setState(queryMasterTask.getState());
     builder.setQueryId(queryMasterTask.getQueryId().getProto());
-
+    builder.setState(queryMasterTask.getState());
     if (queryMasterTask.getQuery() != null) {
+      if (queryMasterTask.getQuery().getResultDesc() != null) {
+        builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
+      }
       builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
       builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 0f4a60c..1c83110 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -24,31 +24,20 @@ import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
 import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
 import org.apache.tajo.ipc.ClientProtos.ResultCode;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.Query;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.rpc.BlockingRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.history.QueryHistory;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Collection;
 
 public class TajoWorkerClientService extends AbstractService {
   private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
@@ -79,15 +68,12 @@ public class TajoWorkerClientService extends AbstractService {
     this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
 
     // init RPC Server in constructor cause Heartbeat Thread use bindAddr
-    // Setup RPC server
     try {
-      // TODO initial port num is value of config and find unused port with sequence
       InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port);
       if (initIsa.getAddress() == null) {
         throw new IllegalArgumentException("Failed resolve of " + initIsa);
       }
 
-      // TODO blocking/non-blocking??
       int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
       this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum);
       this.rpcServer.start();
@@ -124,119 +110,6 @@ public class TajoWorkerClientService extends AbstractService {
 
   public class TajoWorkerClientProtocolServiceHandler
           implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
-    @Override
-    public PrimitiveProtos.BoolProto updateSessionVariables(
-            RpcController controller,
-            ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
-      return null;
-    }
-
-    private boolean hasResultTableDesc(QueryContext queryContext) {
-      return !(queryContext.isCreateTable() || queryContext.isInsert());
-    }
-
-    @Override
-    public ClientProtos.GetQueryResultResponse getQueryResult(
-            RpcController controller,
-            ClientProtos.GetQueryResultRequest request) throws ServiceException {
-      QueryId queryId = new QueryId(request.getQueryId());
-      QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
-
-      ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
-      try {
-        builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
-      } catch (IOException e) {
-        LOG.warn("Can't get current user name");
-      }
-
-      if(queryMasterTask == null || queryMasterTask.getQuery() == null) {
-        builder.setErrorMessage("No Query for " + queryId);
-      } else {
-        switch (queryMasterTask.getState()) {
-          case QUERY_SUCCEEDED:
-//            if (hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())) {
-              builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto());
-            //}
-            break;
-          case QUERY_FAILED:
-          case QUERY_ERROR:
-            builder.setErrorMessage("Query " + queryId + " is failed");
-          default:
-            builder.setErrorMessage("Query " + queryId + " is still running");
-        }
-      }
-      return builder.build();
-    }
-
-    @Override
-    public ClientProtos.GetQueryStatusResponse getQueryStatus(
-            RpcController controller,
-            ClientProtos.GetQueryStatusRequest request) throws ServiceException {
-      ClientProtos.GetQueryStatusResponse.Builder builder
-              = ClientProtos.GetQueryStatusResponse.newBuilder();
-      QueryId queryId = new QueryId(request.getQueryId());
-
-      builder.setQueryId(request.getQueryId());
-
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-        builder.setResultCode(ClientProtos.ResultCode.OK);
-        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
-      } else {
-        QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
-
-        builder.setResultCode(ClientProtos.ResultCode.OK);
-        builder.setQueryMasterHost(bindAddr.getHostName());
-        builder.setQueryMasterPort(bindAddr.getPort());
-
-        if (queryMasterTask == null) {
-          queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
-        }
-        if (queryMasterTask == null) {
-          builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
-          return builder.build();
-        }
-
-        builder.setHasResult(hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext()));
-
-        queryMasterTask.touchSessionTime();
-        Query query = queryMasterTask.getQuery();
-
-        if (query != null) {
-          builder.setState(queryMasterTask.getState());
-          builder.setProgress(query.getProgress());
-          builder.setSubmitTime(query.getAppSubmitTime());
-          if (queryMasterTask.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(query.getFinishTime());
-          } else {
-            builder.setFinishTime(System.currentTimeMillis());
-          }
-        } 
-        Collection<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = queryMasterTask.getDiagnostics();
-        if(!diagnostics.isEmpty()) {
-          TajoWorkerProtocol.TaskFatalErrorReport firstError = diagnostics.iterator().next();
-          builder.setErrorMessage(firstError.getErrorMessage());
-          builder.setErrorTrace(firstError.getErrorTrace());
-        }
-
-        if (queryMasterTask.isInitError()) {
-          Throwable initError = queryMasterTask.getInitError();
-          builder.setErrorMessage(
-              initError.getMessage() == null ? initError.getClass().getName() : initError.getMessage());
-          builder.setErrorTrace(StringUtils.stringifyException(initError));
-          builder.setState(queryMasterTask.getState());
-        }
-      }
-      return builder.build();
-    }
-
-    @Override
-    public PrimitiveProtos.BoolProto closeQuery (
-            RpcController controller,
-            TajoIdProtos.QueryIdProto request) throws ServiceException {
-      final QueryId queryId = new QueryId(request);
-      LOG.info("Stop Query:" + queryId);
-      return BOOL_TRUE;
-    }
 
     @Override
     public GetQueryHistoryResponse getQueryHistory(RpcController controller, QueryIdRequest request) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index e5eab4f..cc83e47 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -66,9 +66,10 @@ message TajoHeartbeat {
   required WorkerConnectionInfoProto connectionInfo = 1;
   optional QueryIdProto queryId = 2;
   optional QueryState state = 3;
-  optional string statusMessage = 4;
-  optional float queryProgress = 5;
-  optional int64 queryFinishTime = 6;
+  optional TableDescProto resultDesc = 4;
+  optional string statusMessage = 5;
+  optional float queryProgress = 6;
+  optional int64 queryFinishTime = 7;
 }
 
 message TajoHeartbeatResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 5ff637c..841be45 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -44,8 +44,8 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
 import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.querymaster.*;
 import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.Stage;
 import org.apache.tajo.master.querymaster.StageState;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -781,14 +781,16 @@ public class TajoTestingCluster {
   }
 
   public void waitForQueryRunning(QueryId queryId, int delay) throws Exception {
-    QueryMasterTask qmt = null;
+    QueryInProgress qip = null;
 
     int i = 0;
-    while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) {
+    while (qip == null || TajoClientUtil.isQueryWaitingForSchedule(qip.getQueryInfo().getQueryState())) {
       try {
         Thread.sleep(delay);
-        if(qmt == null){
-          qmt = getQueryMasterTask(queryId);
+        if(qip == null){
+
+          TajoMaster master = getMaster();
+          qip = master.getContext().getQueryJobManager().getQueryInProgress(queryId);
         }
       } catch (InterruptedException e) {
       }
@@ -824,16 +826,4 @@ public class TajoTestingCluster {
       }
     }
   }
-
-  public QueryMasterTask getQueryMasterTask(QueryId queryId) {
-    QueryMasterTask qmt = null;
-    for (TajoWorker worker : getTajoWorkers()) {
-      qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId);
-      if (qmt != null) {
-        break;
-      }
-    }
-
-    return qmt;
-  }
 }