You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/01/05 11:11:51 UTC
[2/3] tajo git commit: TAJO-1228: TajoClient should communicate with
only TajoMaster without TajoWorker.
TAJO-1228: TajoClient should communicate with only TajoMaster without TajoWorker.
Closes #317
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cb9793b9
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cb9793b9
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cb9793b9
Branch: refs/heads/index_support
Commit: cb9793b990f1c882e3371a44e6c3f28fe913c1a7
Parents: 7a1ac28
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Jan 5 17:44:14 2015 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Jan 5 17:44:14 2015 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../org/apache/tajo/client/QueryClientImpl.java | 99 +++------------
.../apache/tajo/client/SessionConnection.java | 19 +--
tajo-client/src/main/proto/ClientProtos.proto | 22 ++--
.../main/proto/QueryMasterClientProtocol.proto | 4 -
.../tajo/master/TajoMasterClientService.java | 18 +--
.../master/querymaster/QueryInProgress.java | 66 +++++-----
.../tajo/master/querymaster/QueryInfo.java | 41 +++++-
.../master/querymaster/QueryJobManager.java | 5 +
.../tajo/master/querymaster/QueryMaster.java | 6 +-
.../tajo/worker/TajoWorkerClientService.java | 127 -------------------
.../src/main/proto/TajoMasterProtocol.proto | 7 +-
.../org/apache/tajo/TajoTestingCluster.java | 24 +---
13 files changed, 135 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b0b1685..8a4ffe1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1228: TajoClient should communicate with only TajoMaster without
+ TajoWorker. (hyunsik)
+
TAJO-1176: Implements queryable virtual tables for catalog information
(jihun)
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index f923965..bab3518 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -36,7 +36,6 @@ import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.jdbc.TajoResultSet;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.ServerCallable;
-import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.ProtoUtil;
import java.io.IOException;
@@ -95,19 +94,7 @@ public class QueryClientImpl implements QueryClient {
@Override
public void closeQuery(QueryId queryId) {
- if(connection.queryMasterMap.containsKey(queryId)) {
- NettyClientBase qmClient = null;
- try {
- qmClient = connection.getConnection(queryId, QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMaster = qmClient.getStub();
- queryMaster.closeQuery(null, queryId.getProto());
- } catch (Exception e) {
- LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
- } finally {
- connection.connPool.closeConnection(qmClient);
- connection.queryMasterMap.remove(queryId);
- }
- }
+ // nothing to do
}
@Override
@@ -318,63 +305,21 @@ public class QueryClientImpl implements QueryClient {
ClientProtos.GetQueryStatusRequest.Builder builder = ClientProtos.GetQueryStatusRequest.newBuilder();
builder.setQueryId(queryId.getProto());
- ClientProtos.GetQueryStatusResponse res = null;
-
- if(connection.queryMasterMap.containsKey(queryId)) {
- NettyClientBase qmClient = null;
-
- try {
-
- qmClient = connection.connPool.getConnection(connection.queryMasterMap.get(queryId),
- QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
- res = queryMasterService.getQueryStatus(null, builder.build());
-
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- } finally {
- connection.connPool.releaseConnection(qmClient);
- }
-
- } else {
+ GetQueryStatusResponse res = null;
- NettyClientBase tmClient = null;
-
- try {
- tmClient = connection.getTajoMasterConnection(false);
- connection.checkSessionAndGet(tmClient);
- builder.setSessionId(connection.sessionId);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
-
- res = tajoMasterService.getQueryStatus(null, builder.build());
-
- String queryMasterHost = res.getQueryMasterHost();
-
- if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
- NettyClientBase qmClient = null;
-
- try {
-
- InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort());
- qmClient = connection.connPool.getConnection(
- qmAddr, QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
- res = queryMasterService.getQueryStatus(null, builder.build());
-
- connection.queryMasterMap.put(queryId, qmAddr);
+ NettyClientBase tmClient = null;
+ try {
+ tmClient = connection.getTajoMasterConnection(false);
+ connection.checkSessionAndGet(tmClient);
+ builder.setSessionId(connection.sessionId);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- } finally {
- connection.connPool.releaseConnection(qmClient);
- }
- }
+ res = tajoMasterService.getQueryStatus(null, builder.build());
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- } finally {
- connection.connPool.releaseConnection(tmClient);
- }
+ } catch (Exception e) {
+ throw new ServiceException(e.getMessage(), e);
+ } finally {
+ connection.connPool.releaseConnection(tmClient);
}
return new QueryStatus(res);
}
@@ -404,29 +349,25 @@ public class QueryClientImpl implements QueryClient {
return null;
}
- NettyClientBase client = null;
+ NettyClientBase tmClient = null;
try {
- InetSocketAddress queryMasterAddr = connection.queryMasterMap.get(queryId);
- if(queryMasterAddr == null) {
- LOG.warn("No Connection to QueryMaster for " + queryId);
- return null;
- }
-
- client = connection.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
+ tmClient = connection.getTajoMasterConnection(false);
+ connection.checkSessionAndGet(tmClient);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
builder.setQueryId(queryId.getProto());
- GetQueryResultResponse response = queryMasterService.getQueryResult(null,builder.build());
+ builder.setSessionId(connection.sessionId);
+ GetQueryResultResponse response = tajoMasterService.getQueryResult(null,builder.build());
return response;
} catch (Exception e) {
throw new ServiceException(e.getMessage(), e);
} finally {
- connection.connPool.releaseConnection(client);
+ connection.connPool.releaseConnection(tmClient);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index c849f2d..1bc8050 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -21,7 +21,7 @@ package org.apache.tajo.client;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.QueryId;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.auth.UserRoleInfo;
@@ -46,7 +46,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
@@ -59,8 +58,6 @@ public class SessionConnection implements Closeable {
private final TajoConf conf;
- final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>();
-
final InetSocketAddress tajoMasterAddr;
final RpcConnectionPool connPool;
@@ -117,23 +114,11 @@ public class SessionConnection implements Closeable {
return Collections.unmodifiableMap(sessionVarsCache);
}
- public <T> T getStub(QueryId queryId, Class protocolClass, boolean asyncMode) throws NoSuchMethodException,
- ConnectTimeoutException, ClassNotFoundException {
- InetSocketAddress addr = queryMasterMap.get(queryId);
- return connPool.getConnection(addr, protocolClass, asyncMode).getStub();
- }
-
public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException,
ConnectTimeoutException, ClassNotFoundException {
return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode);
}
- public NettyClientBase getConnection(QueryId queryId, Class protocolClass, boolean asyncMode)
- throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
- InetSocketAddress addr = queryMasterMap.get(queryId);
- return connPool.getConnection(addr, protocolClass, asyncMode);
- }
-
public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode)
throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException {
return connPool.getConnection(addr, protocolClass, asyncMode);
@@ -321,8 +306,6 @@ public class SessionConnection implements Closeable {
if(connPool != null) {
connPool.shutdown();
}
-
- queryMasterMap.clear();
}
protected InetSocketAddress getTajoMasterAddr() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index a741268..a9f5498 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -87,7 +87,7 @@ message GetQueryResultRequest {
message GetQueryResultResponse {
optional TableDescProto tableDesc = 1;
optional string errorMessage = 2;
- required string tajoUserName = 3;
+ optional string tajoUserName = 3;
}
message QueryIdRequest {
@@ -242,15 +242,17 @@ message FunctionResponse {
message QueryInfoProto {
required string queryId = 1;
optional string sql = 2;
- optional QueryState queryState = 3;
- optional float progress = 4;
- optional int64 startTime = 5;
- optional int64 finishTime = 6;
- optional string lastMessage = 7;
- optional string hostNameOfQM = 8;
- optional int32 queryMasterPort = 9;
- optional int32 queryMasterClientPort = 10;
- optional int32 queryMasterInfoPort = 11;
+ optional KeyValueSetProto contextVars= 3;
+ optional QueryState queryState = 4;
+ optional float progress = 5;
+ optional int64 startTime = 6;
+ optional int64 finishTime = 7;
+ optional string lastMessage = 8;
+ optional string hostNameOfQM = 9;
+ optional int32 queryMasterPort = 10;
+ optional int32 queryMasterClientPort = 11;
+ optional int32 queryMasterInfoPort = 12;
+ optional TableDescProto resultDesc = 13;
}
message StageHistoryProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
index 3d8d70b..0b11566 100644
--- a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
+++ b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto
@@ -28,9 +28,5 @@ import "PrimitiveProtos.proto";
import "ClientProtos.proto";
service QueryMasterClientProtocolService {
- rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
- rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
- rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
- rpc closeQuery(QueryIdProto) returns (BoolProto);
rpc getQueryHistory(QueryIdRequest) returns (GetQueryHistoryResponse);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index c413b65..249d335 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -345,6 +345,7 @@ public class TajoMasterClientService extends AbstractService {
}
GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder();
+ builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
// If we cannot the QueryInfo instance from the finished list,
// the query result was expired due to timeout.
@@ -354,20 +355,16 @@ public class TajoMasterClientService extends AbstractService {
return builder.build();
}
- try {
- //TODO After implementation Tajo's user security feature, Should be modified.
- builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
- } catch (IOException e) {
- LOG.warn("Can't get current user name");
- }
switch (queryInfo.getQueryState()) {
case QUERY_SUCCEEDED:
- // TODO check this logic needed
- //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
+ if (queryInfo.hasResultdesc()) {
+ builder.setTableDesc(queryInfo.getResultDesc().getProto());
+ }
break;
case QUERY_FAILED:
case QUERY_ERROR:
builder.setErrorMessage("Query " + queryId + " is failed");
+ break;
default:
builder.setErrorMessage("Query " + queryId + " is still running");
}
@@ -479,6 +476,11 @@ public class TajoMasterClientService extends AbstractService {
if (queryInfo != null) {
builder.setResultCode(ResultCode.OK);
builder.setState(queryInfo.getQueryState());
+
+ boolean isCreateTable = queryInfo.getQueryContext().isCreateTable();
+ boolean isInsert = queryInfo.getQueryContext().isInsert();
+ builder.setHasResult(!(isCreateTable || isInsert));
+
builder.setProgress(queryInfo.getProgress());
builder.setSubmitTime(queryInfo.getStartTime());
if(queryInfo.getQueryMasterHost() != null) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index e361c7f..ca0bd72 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -24,13 +24,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -39,6 +36,7 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResourceManager;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
@@ -57,8 +55,6 @@ public class QueryInProgress extends CompositeService {
private Session session;
- private QueryContext queryContext;
-
private TajoAsyncDispatcher dispatcher;
private LogicalRootNode plan;
@@ -75,8 +71,6 @@ public class QueryInProgress extends CompositeService {
private QueryMasterProtocolService queryMasterRpcClient;
- private ContainerProtocol.TajoContainerIdProto qmContainerId;
-
public QueryInProgress(
TajoMaster.MasterContext masterContext,
Session session,
@@ -85,11 +79,10 @@ public class QueryInProgress extends CompositeService {
super(QueryInProgress.class.getName());
this.masterContext = masterContext;
this.session = session;
- this.queryContext = queryContext;
this.queryId = queryId;
this.plan = plan;
- queryInfo = new QueryInfo(queryId, sql, jsonExpr);
+ queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
queryInfo.setStartTime(System.currentTimeMillis());
}
@@ -145,7 +138,7 @@ public class QueryInProgress extends CompositeService {
}
if(queryMasterRpc != null) {
- RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc);
+ RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
}
masterContext.getHistoryWriter().appendHistory(queryInfo);
@@ -212,7 +205,7 @@ public class QueryInProgress extends CompositeService {
InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
LOG.info("Connect to QueryMaster:" + addr);
queryMasterRpc =
- RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true);
+ RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
queryMasterRpcClient = queryMasterRpc.getStub();
}
@@ -235,8 +228,8 @@ public class QueryInProgress extends CompositeService {
QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
builder.setQueryId(queryId.getProto())
+ .setQueryContext(queryInfo.getQueryContext().getProto())
.setSession(session.getProto())
- .setQueryContext(queryContext.getProto())
.setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
.setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
@@ -267,28 +260,37 @@ public class QueryInProgress extends CompositeService {
private void heartbeat(QueryInfo queryInfo) {
LOG.info("Received QueryMaster heartbeat:" + queryInfo);
- this.queryInfo.setQueryState(queryInfo.getQueryState());
- this.queryInfo.setProgress(queryInfo.getProgress());
- this.queryInfo.setFinishTime(queryInfo.getFinishTime());
- if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
- this.queryInfo.setLastMessage(queryInfo.getLastMessage());
- LOG.info(queryId + queryInfo.getLastMessage());
- }
- if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
- //TODO needed QueryMaster's detail status(failed before or after launching worker)
- //queryMasterStopped.set(true);
- LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
- }
+ // to avoid partial update by different heartbeats
+ synchronized (this.queryInfo) {
- if(!querySubmitted.get()) {
- getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo));
- }
+ // terminal state will let client to retrieve a query result
+ // So, we must set the query result before changing query state
+ if (isFinishState(queryInfo.getQueryState())) {
+ if (queryInfo.hasResultdesc()) {
+ this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+ }
+ }
+
+ this.queryInfo.setQueryState(queryInfo.getQueryState());
+ this.queryInfo.setProgress(queryInfo.getProgress());
+ this.queryInfo.setFinishTime(queryInfo.getFinishTime());
- if(isFinishState(this.queryInfo.getQueryState())) {
- getEventHandler().handle(
- new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo));
+ // Update diagnosis message
+ if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+ this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+ LOG.info(queryId + queryInfo.getLastMessage());
+ }
+
+ // if any error occurs, print outs the error message
+ if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+ LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+ }
+
+
+ if (isFinishState(this.queryInfo.getQueryState())) {
+ stop();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index 00b95ac..559fc14 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -22,7 +22,9 @@ package org.apache.tajo.master.querymaster;
import com.google.gson.annotations.Expose;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.util.TajoIdUtils;
@@ -31,15 +33,17 @@ import org.apache.tajo.util.history.History;
public class QueryInfo implements GsonObject, History {
private QueryId queryId;
@Expose
+ private QueryContext context;
+ @Expose
private String sql;
@Expose
- private TajoProtos.QueryState queryState;
+ private volatile TajoProtos.QueryState queryState;
@Expose
- private float progress;
+ private volatile float progress;
@Expose
- private long startTime;
+ private volatile long startTime;
@Expose
- private long finishTime;
+ private volatile long finishTime;
@Expose
private String lastMessage;
@Expose
@@ -52,18 +56,22 @@ public class QueryInfo implements GsonObject, History {
private int queryMasterInfoPort;
@Expose
private String queryIdStr;
+ @Expose
+ private volatile TableDesc resultDesc;
private String jsonExpr;
public QueryInfo(QueryId queryId) {
- this(queryId, null, null);
+ this(queryId, null, null, null);
}
- public QueryInfo(QueryId queryId, String sql, String jsonExpr) {
+ public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) {
this.queryId = queryId;
this.queryIdStr = queryId.toString();
+ this.context = queryContext;
this.sql = sql;
this.jsonExpr = jsonExpr;
+
this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
}
@@ -71,6 +79,10 @@ public class QueryInfo implements GsonObject, History {
return queryId;
}
+ public QueryContext getQueryContext() {
+ return context;
+ }
+
public String getSql() {
return sql;
}
@@ -147,6 +159,18 @@ public class QueryInfo implements GsonObject, History {
this.progress = progress;
}
+ public void setResultDesc(TableDesc result) {
+ this.resultDesc = result;
+ }
+
+ public boolean hasResultdesc() {
+ return resultDesc != null;
+ }
+
+ public TableDesc getResultDesc() {
+ return resultDesc;
+ }
+
@Override
public String toString() {
return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster="
@@ -182,6 +206,7 @@ public class QueryInfo implements GsonObject, History {
builder.setQueryId(queryId.toString())
.setQueryState(queryState)
+ .setContextVars(context.getProto())
.setProgress(progress)
.setStartTime(startTime)
.setFinishTime(finishTime)
@@ -189,6 +214,10 @@ public class QueryInfo implements GsonObject, History {
.setQueryMasterClientPort(queryMasterClientPort)
.setQueryMasterInfoPort(queryMasterInfoPort);
+ if (resultDesc != null) {
+ builder.setResultDesc(resultDesc.getProto());
+ }
+
if (sql != null) {
builder.setSql(sql);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index ddbd3e1..34a0d01 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
@@ -300,6 +301,10 @@ public class QueryJobManager extends CompositeService {
queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime());
}
+ if (queryHeartbeat.hasResultDesc()) {
+ queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc()));
+ }
+
return queryInfo;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 7ddd787..7623026 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -454,10 +454,12 @@ public class QueryMaster extends CompositeService implements EventHandler {
TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder();
builder.setConnectionInfo(workerContext.getConnectionInfo().getProto());
- builder.setState(queryMasterTask.getState());
builder.setQueryId(queryMasterTask.getQueryId().getProto());
-
+ builder.setState(queryMasterTask.getState());
if (queryMasterTask.getQuery() != null) {
+ if (queryMasterTask.getQuery().getResultDesc() != null) {
+ builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
+ }
builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index 0f4a60c..1c83110 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -24,31 +24,20 @@ import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.TajoProtos;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
import org.apache.tajo.ipc.ClientProtos.ResultCode;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.querymaster.Query;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.rpc.BlockingRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.history.QueryHistory;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collection;
public class TajoWorkerClientService extends AbstractService {
private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
@@ -79,15 +68,12 @@ public class TajoWorkerClientService extends AbstractService {
this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
// init RPC Server in constructor cause Heartbeat Thread use bindAddr
- // Setup RPC server
try {
- // TODO initial port num is value of config and find unused port with sequence
InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port);
if (initIsa.getAddress() == null) {
throw new IllegalArgumentException("Failed resolve of " + initIsa);
}
- // TODO blocking/non-blocking??
int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM);
this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum);
this.rpcServer.start();
@@ -124,119 +110,6 @@ public class TajoWorkerClientService extends AbstractService {
public class TajoWorkerClientProtocolServiceHandler
implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
- @Override
- public PrimitiveProtos.BoolProto updateSessionVariables(
- RpcController controller,
- ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
- return null;
- }
-
- private boolean hasResultTableDesc(QueryContext queryContext) {
- return !(queryContext.isCreateTable() || queryContext.isInsert());
- }
-
- @Override
- public ClientProtos.GetQueryResultResponse getQueryResult(
- RpcController controller,
- ClientProtos.GetQueryResultRequest request) throws ServiceException {
- QueryId queryId = new QueryId(request.getQueryId());
- QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
-
- ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
- try {
- builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
- } catch (IOException e) {
- LOG.warn("Can't get current user name");
- }
-
- if(queryMasterTask == null || queryMasterTask.getQuery() == null) {
- builder.setErrorMessage("No Query for " + queryId);
- } else {
- switch (queryMasterTask.getState()) {
- case QUERY_SUCCEEDED:
-// if (hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())) {
- builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto());
- //}
- break;
- case QUERY_FAILED:
- case QUERY_ERROR:
- builder.setErrorMessage("Query " + queryId + " is failed");
- default:
- builder.setErrorMessage("Query " + queryId + " is still running");
- }
- }
- return builder.build();
- }
-
- @Override
- public ClientProtos.GetQueryStatusResponse getQueryStatus(
- RpcController controller,
- ClientProtos.GetQueryStatusRequest request) throws ServiceException {
- ClientProtos.GetQueryStatusResponse.Builder builder
- = ClientProtos.GetQueryStatusResponse.newBuilder();
- QueryId queryId = new QueryId(request.getQueryId());
-
- builder.setQueryId(request.getQueryId());
-
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- builder.setResultCode(ClientProtos.ResultCode.OK);
- builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
- } else {
- QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
-
- builder.setResultCode(ClientProtos.ResultCode.OK);
- builder.setQueryMasterHost(bindAddr.getHostName());
- builder.setQueryMasterPort(bindAddr.getPort());
-
- if (queryMasterTask == null) {
- queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
- }
- if (queryMasterTask == null) {
- builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
- return builder.build();
- }
-
- builder.setHasResult(hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext()));
-
- queryMasterTask.touchSessionTime();
- Query query = queryMasterTask.getQuery();
-
- if (query != null) {
- builder.setState(queryMasterTask.getState());
- builder.setProgress(query.getProgress());
- builder.setSubmitTime(query.getAppSubmitTime());
- if (queryMasterTask.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- builder.setFinishTime(query.getFinishTime());
- } else {
- builder.setFinishTime(System.currentTimeMillis());
- }
- }
- Collection<TajoWorkerProtocol.TaskFatalErrorReport> diagnostics = queryMasterTask.getDiagnostics();
- if(!diagnostics.isEmpty()) {
- TajoWorkerProtocol.TaskFatalErrorReport firstError = diagnostics.iterator().next();
- builder.setErrorMessage(firstError.getErrorMessage());
- builder.setErrorTrace(firstError.getErrorTrace());
- }
-
- if (queryMasterTask.isInitError()) {
- Throwable initError = queryMasterTask.getInitError();
- builder.setErrorMessage(
- initError.getMessage() == null ? initError.getClass().getName() : initError.getMessage());
- builder.setErrorTrace(StringUtils.stringifyException(initError));
- builder.setState(queryMasterTask.getState());
- }
- }
- return builder.build();
- }
-
- @Override
- public PrimitiveProtos.BoolProto closeQuery (
- RpcController controller,
- TajoIdProtos.QueryIdProto request) throws ServiceException {
- final QueryId queryId = new QueryId(request);
- LOG.info("Stop Query:" + queryId);
- return BOOL_TRUE;
- }
@Override
public GetQueryHistoryResponse getQueryHistory(RpcController controller, QueryIdRequest request) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index e5eab4f..cc83e47 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -66,9 +66,10 @@ message TajoHeartbeat {
required WorkerConnectionInfoProto connectionInfo = 1;
optional QueryIdProto queryId = 2;
optional QueryState state = 3;
- optional string statusMessage = 4;
- optional float queryProgress = 5;
- optional int64 queryFinishTime = 6;
+ optional TableDescProto resultDesc = 4;
+ optional string statusMessage = 5;
+ optional float queryProgress = 6;
+ optional int64 queryFinishTime = 7;
}
message TajoHeartbeatResponse {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cb9793b9/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 5ff637c..841be45 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -44,8 +44,8 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.querymaster.*;
import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.Stage;
import org.apache.tajo.master.querymaster.StageState;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
@@ -781,14 +781,16 @@ public class TajoTestingCluster {
}
public void waitForQueryRunning(QueryId queryId, int delay) throws Exception {
- QueryMasterTask qmt = null;
+ QueryInProgress qip = null;
int i = 0;
- while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) {
+ while (qip == null || TajoClientUtil.isQueryWaitingForSchedule(qip.getQueryInfo().getQueryState())) {
try {
Thread.sleep(delay);
- if(qmt == null){
- qmt = getQueryMasterTask(queryId);
+ if(qip == null){
+
+ TajoMaster master = getMaster();
+ qip = master.getContext().getQueryJobManager().getQueryInProgress(queryId);
}
} catch (InterruptedException e) {
}
@@ -824,16 +826,4 @@ public class TajoTestingCluster {
}
}
}
-
- public QueryMasterTask getQueryMasterTask(QueryId queryId) {
- QueryMasterTask qmt = null;
- for (TajoWorker worker : getTajoWorkers()) {
- qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId);
- if (qmt != null) {
- break;
- }
- }
-
- return qmt;
- }
}