You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/25 02:36:21 UTC
[06/13] TAJO-353: Add Database support to Tajo. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index dc70f23..a504e7b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -27,11 +27,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.TajoProtos;
+import org.apache.tajo.*;
import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
@@ -47,15 +45,22 @@ import org.apache.tajo.master.querymaster.QueryJobEvent;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.Worker;
import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.session.InvalidSessionException;
+import org.apache.tajo.master.session.NoSuchSessionVariableException;
+import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.BlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
public class TajoMasterClientService extends AbstractService {
private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class);
private final MasterContext context;
@@ -111,32 +116,150 @@ public class TajoMasterClientService extends AbstractService {
return this.bindAddress;
}
- public int getHttpPort() {
- return 0;
- }
/////////////////////////////////////////////////////////////////////////////
// TajoMasterClientProtocolService
/////////////////////////////////////////////////////////////////////////////
public class TajoMasterClientProtocolServiceHandler implements TajoMasterClientProtocolService.BlockingInterface {
@Override
- public BoolProto updateSessionVariables(RpcController controller,
- UpdateSessionVariableRequest request)
+ public CreateSessionResponse createSession(RpcController controller, CreateSessionRequest request)
throws ServiceException {
- return null;
+ try {
+ // create a new session with base database name. If no database name is give, we use default database.
+ String databaseName = request.hasBaseDatabaseName() ? request.getBaseDatabaseName() : DEFAULT_DATABASE_NAME;
+
+ if (!context.getCatalog().existDatabase(databaseName)) {
+ LOG.info("Session creation is canceled due to absent base database \"" + databaseName + "\".");
+ throw new NoSuchDatabaseException(databaseName);
+ }
+
+ String sessionId =
+ context.getSessionManager().createSession(request.getUsername(), databaseName);
+ CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+ builder.setState(CreateSessionResponse.ResultState.SUCCESS);
+ builder.setSessionId(TajoIdProtos.SessionIdProto.newBuilder().setId(sessionId).build());
+ return builder.build();
+ } catch (NoSuchDatabaseException nsde) {
+ CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+ builder.setState(CreateSessionResponse.ResultState.FAILED);
+ builder.setMessage(nsde.getMessage());
+ return builder.build();
+ } catch (InvalidSessionException e) {
+ CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+ builder.setState(CreateSessionResponse.ResultState.FAILED);
+ builder.setMessage(e.getMessage());
+ return builder.build();
+ }
}
@Override
- public ExplainQueryResponse explainQuery(RpcController controller,
- ExplainQueryRequest request)
+ public BoolProto removeSession(RpcController controller, TajoIdProtos.SessionIdProto request)
+ throws ServiceException {
+ if (request != null) {
+ context.getSessionManager().removeSession(request.getId());
+ }
+ return ProtoUtil.TRUE;
+ }
+
+ @Override
+ public BoolProto updateSessionVariables(RpcController controller, UpdateSessionVariableRequest request)
+ throws ServiceException {
+ try {
+ String sessionId = request.getSessionId().getId();
+ for (CatalogProtos.KeyValueProto kv : request.getSetVariables().getKeyvalList()) {
+ context.getSessionManager().setVariable(sessionId, kv.getKey(), kv.getValue());
+ }
+ for (String unsetVariable : request.getUnsetVariablesList()) {
+ context.getSessionManager().removeVariable(sessionId, unsetVariable);
+ }
+ return ProtoUtil.TRUE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public StringProto getSessionVariable(RpcController controller, SessionedStringProto request)
throws ServiceException {
try {
+ return ProtoUtil.convertString(
+ context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue()));
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue());
+ if (value != null) {
+ return ProtoUtil.TRUE;
+ } else {
+ return ProtoUtil.FALSE;
+ }
+ } catch (NoSuchSessionVariableException nssv) {
+ return ProtoUtil.FALSE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public CatalogProtos.KeyValueSetProto getAllSessionVariables(RpcController controller,
+ TajoIdProtos.SessionIdProto request)
+ throws ServiceException {
+ try {
+ String sessionId = request.getId();
+ Options options = new Options();
+ options.putAll(context.getSessionManager().getAllVariables(sessionId));
+ return options.getProto();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public StringProto getCurrentDatabase(RpcController controller, TajoIdProtos.SessionIdProto request)
+ throws ServiceException {
+ try {
+ String sessionId = request.getId();
+ return ProtoUtil.convertString(context.getSessionManager().getSession(sessionId).getCurrentDatabase());
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public BoolProto selectDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ String sessionId = request.getSessionId().getId();
+ String databaseName = CatalogUtil.normalizeIdentifier(request.getValue());
+
+ if (context.getCatalog().existDatabase(databaseName)) {
+ context.getSessionManager().getSession(sessionId).selectDatabase(databaseName);
+ return ProtoUtil.TRUE;
+ } else {
+ throw new ServiceException(new NoSuchDatabaseException(databaseName));
+ }
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
+ }
+
+ @Override
+ public ExplainQueryResponse explainQuery(RpcController controller,
+ SessionedStringProto request) throws ServiceException {
+
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
if(LOG.isDebugEnabled()) {
- LOG.debug("ExplainQuery [" + request.getQuery() + "]");
+ LOG.debug("ExplainQuery [" + request.getValue() + "]");
}
ClientProtos.ExplainQueryResponse.Builder responseBuilder = ClientProtos.ExplainQueryResponse.newBuilder();
responseBuilder.setResultCode(ResultCode.OK);
- String plan = context.getGlobalEngine().explainQuery(request.getQuery());
+ String plan = context.getGlobalEngine().explainQuery(session, request.getValue());
if(LOG.isDebugEnabled()) {
LOG.debug("ExplainQuery [" + plan + "]");
}
@@ -151,15 +274,16 @@ public class TajoMasterClientService extends AbstractService {
}
}
@Override
- public GetQueryStatusResponse submitQuery(RpcController controller,
- QueryRequest request)
- throws ServiceException {
+ public GetQueryStatusResponse submitQuery(RpcController controller, QueryRequest request) throws ServiceException {
+
try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
if(LOG.isDebugEnabled()) {
LOG.debug("Query [" + request.getQuery() + "] is submitted");
}
- return context.getGlobalEngine().executeQuery(request.getQuery());
+ return context.getGlobalEngine().executeQuery(session, request.getQuery());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
ClientProtos.GetQueryStatusResponse.Builder responseBuilder = ClientProtos.GetQueryStatusResponse.newBuilder();
@@ -174,119 +298,135 @@ public class TajoMasterClientService extends AbstractService {
}
@Override
- public UpdateQueryResponse updateQuery(RpcController controller,
- QueryRequest request)
- throws ServiceException {
+ public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest request) throws ServiceException {
- UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
try {
- context.getGlobalEngine().updateQuery(request.getQuery());
- builder.setResultCode(ResultCode.OK);
- return builder.build();
- } catch (Exception e) {
- builder.setResultCode(ResultCode.ERROR);
- if (e.getMessage() == null) {
- builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
+ try {
+ context.getGlobalEngine().updateQuery(session, request.getQuery());
+ builder.setResultCode(ResultCode.OK);
+ return builder.build();
+ } catch (Exception e) {
+ builder.setResultCode(ResultCode.ERROR);
+ if (e.getMessage() == null) {
+ builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+ }
+ return builder.build();
}
- return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
}
}
@Override
public GetQueryResultResponse getQueryResult(RpcController controller,
- GetQueryResultRequest request)
- throws ServiceException {
- QueryId queryId = new QueryId(request.getQueryId());
- QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
- QueryInfo queryInfo = queryInProgress.getQueryInfo();
- GetQueryResultResponse.Builder builder
- = GetQueryResultResponse.newBuilder();
-
+ GetQueryResultRequest request) throws ServiceException {
try {
- //TODO After implementation Tajo's user security feature, Should be modified.
- builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
- } catch (IOException e) {
- LOG.warn("Can't get current user name");
- }
- switch (queryInfo.getQueryState()) {
- case QUERY_SUCCEEDED:
- // TODO check this logic needed
- //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
- break;
- case QUERY_FAILED:
- case QUERY_ERROR:
- builder.setErrorMessage("Query " + queryId + " is failed");
- default:
- builder.setErrorMessage("Query " + queryId + " is still running");
- }
+ context.getSessionManager().touch(request.getSessionId().getId());
+ QueryId queryId = new QueryId(request.getQueryId());
+ QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+ GetQueryResultResponse.Builder builder
+ = GetQueryResultResponse.newBuilder();
+
+ try {
+ //TODO After implementation Tajo's user security feature, Should be modified.
+ builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
+ } catch (IOException e) {
+ LOG.warn("Can't get current user name");
+ }
+ switch (queryInfo.getQueryState()) {
+ case QUERY_SUCCEEDED:
+ // TODO check this logic needed
+ //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
+ break;
+ case QUERY_FAILED:
+ case QUERY_ERROR:
+ builder.setErrorMessage("Query " + queryId + " is failed");
+ default:
+ builder.setErrorMessage("Query " + queryId + " is still running");
+ }
- return builder.build();
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
}
@Override
- public GetQueryListResponse getRunningQueryList(RpcController controller,
- GetQueryListRequest request)
+ public GetQueryListResponse getRunningQueryList(RpcController controller, GetQueryListRequest request)
+
throws ServiceException {
- GetQueryListResponse.Builder builder
- = GetQueryListResponse.newBuilder();
-
- Collection<QueryInProgress> queries
- = context.getQueryJobManager().getRunningQueries();
-
- BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
-
- for (QueryInProgress queryInProgress : queries) {
- QueryInfo queryInfo = queryInProgress.getQueryInfo();
- infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
- infoBuilder.setState(queryInfo.getQueryState());
- infoBuilder.setQuery(queryInfo.getSql());
- infoBuilder.setStartTime(queryInfo.getStartTime());
- long endTime = (queryInfo.getFinishTime() == 0) ?
- System.currentTimeMillis() : queryInfo.getFinishTime();
- infoBuilder.setFinishTime(endTime);
- infoBuilder.setProgress(queryInfo.getProgress());
- infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
- infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
-
- builder.addQueryList(infoBuilder.build());
- }
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();
- GetQueryListResponse result = builder.build();
- return result;
+ Collection<QueryInProgress> queries
+ = context.getQueryJobManager().getRunningQueries();
+
+ BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+
+ for (QueryInProgress queryInProgress : queries) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+ infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ infoBuilder.setState(queryInfo.getQueryState());
+ infoBuilder.setQuery(queryInfo.getSql());
+ infoBuilder.setStartTime(queryInfo.getStartTime());
+ long endTime = (queryInfo.getFinishTime() == 0) ?
+ System.currentTimeMillis() : queryInfo.getFinishTime();
+ infoBuilder.setFinishTime(endTime);
+ infoBuilder.setProgress(queryInfo.getProgress());
+ infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+ infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+ builder.addQueryList(infoBuilder.build());
+ }
+
+ GetQueryListResponse result = builder.build();
+ return result;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
}
@Override
- public GetQueryListResponse getFinishedQueryList(RpcController controller,
- GetQueryListRequest request)
+ public GetQueryListResponse getFinishedQueryList(RpcController controller, GetQueryListRequest request)
throws ServiceException {
- GetQueryListResponse.Builder builder
- = GetQueryListResponse.newBuilder();
- Collection<QueryInProgress> queries
- = context.getQueryJobManager().getFinishedQueries();
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder();
- BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+ Collection<QueryInProgress> queries
+ = context.getQueryJobManager().getFinishedQueries();
- for (QueryInProgress queryInProgress : queries) {
- QueryInfo queryInfo = queryInProgress.getQueryInfo();
+ BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
- infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
- infoBuilder.setState(queryInfo.getQueryState());
- infoBuilder.setQuery(queryInfo.getSql());
- infoBuilder.setStartTime(queryInfo.getStartTime());
- long endTime = (queryInfo.getFinishTime() == 0) ?
- System.currentTimeMillis() : queryInfo.getFinishTime();
- infoBuilder.setFinishTime(endTime);
- infoBuilder.setProgress(queryInfo.getProgress());
- infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
- infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
-
- builder.addQueryList(infoBuilder.build());
- }
+ for (QueryInProgress queryInProgress : queries) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
- GetQueryListResponse result = builder.build();
- return result;
+ infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+ infoBuilder.setState(queryInfo.getQueryState());
+ infoBuilder.setQuery(queryInfo.getSql());
+ infoBuilder.setStartTime(queryInfo.getStartTime());
+ long endTime = (queryInfo.getFinishTime() == 0) ?
+ System.currentTimeMillis() : queryInfo.getFinishTime();
+ infoBuilder.setFinishTime(endTime);
+ infoBuilder.setProgress(queryInfo.getProgress());
+ infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+ infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+ builder.addQueryList(infoBuilder.build());
+ }
+
+ GetQueryListResponse result = builder.build();
+ return result;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
}
@Override
@@ -294,136 +434,242 @@ public class TajoMasterClientService extends AbstractService {
GetQueryStatusRequest request)
throws ServiceException {
- GetQueryStatusResponse.Builder builder
- = GetQueryStatusResponse.newBuilder();
- QueryId queryId = new QueryId(request.getQueryId());
- builder.setQueryId(request.getQueryId());
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- builder.setResultCode(ResultCode.OK);
- builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
- } else {
- QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
- if (queryInProgress != null) {
- QueryInfo queryInfo = queryInProgress.getQueryInfo();
+ GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder();
+ QueryId queryId = new QueryId(request.getQueryId());
+ builder.setQueryId(request.getQueryId());
+
+ if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
builder.setResultCode(ResultCode.OK);
- builder.setState(queryInfo.getQueryState());
- builder.setProgress(queryInfo.getProgress());
- builder.setSubmitTime(queryInfo.getStartTime());
- if(queryInfo.getQueryMasterHost() != null) {
- builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
- builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
- }
- //builder.setInitTime(queryJobManager.getInitializationTime());
- //builder.setHasResult(!queryJobManager.isCreateTableStmt());
- if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- builder.setFinishTime(queryInfo.getFinishTime());
+ builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+ } else {
+ QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+ if (queryInProgress != null) {
+ QueryInfo queryInfo = queryInProgress.getQueryInfo();
+ builder.setResultCode(ResultCode.OK);
+ builder.setState(queryInfo.getQueryState());
+ builder.setProgress(queryInfo.getProgress());
+ builder.setSubmitTime(queryInfo.getStartTime());
+ if(queryInfo.getQueryMasterHost() != null) {
+ builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+ builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+ }
+ //builder.setInitTime(queryJobManager.getInitializationTime());
+ //builder.setHasResult(!queryJobManager.isCreateTableStmt());
+ if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ builder.setFinishTime(queryInfo.getFinishTime());
+ } else {
+ builder.setFinishTime(System.currentTimeMillis());
+ }
} else {
- builder.setFinishTime(System.currentTimeMillis());
+ builder.setResultCode(ResultCode.ERROR);
+ builder.setErrorMessage("No such query: " + queryId.toString());
}
- } else {
- builder.setResultCode(ResultCode.ERROR);
- builder.setErrorMessage("No such query: " + queryId.toString());
}
- }
+ return builder.build();
- return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
}
/**
* It is invoked by TajoContainerProxy.
*/
@Override
- public BoolProto killQuery(RpcController controller, TajoIdProtos.QueryIdProto request) throws ServiceException {
- QueryId queryId = new QueryId(request);
- QueryJobManager queryJobManager = context.getQueryJobManager();
- queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
- new QueryInfo(queryId)));
- return BOOL_TRUE;
+ public BoolProto killQuery(RpcController controller, KillQueryRequest request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ QueryId queryId = new QueryId(request.getQueryId());
+ QueryJobManager queryJobManager = context.getQueryJobManager();
+ queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+ new QueryInfo(queryId)));
+ return BOOL_TRUE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
}
@Override
public GetClusterInfoResponse getClusterInfo(RpcController controller,
GetClusterInfoRequest request)
throws ServiceException {
- GetClusterInfoResponse.Builder builder
- = GetClusterInfoResponse.newBuilder();
-
- Map<String, Worker> workers = context.getResourceManager().getWorkers();
-
- List<String> wokerKeys = new ArrayList<String>(workers.keySet());
- Collections.sort(wokerKeys);
-
- WorkerResourceInfo.Builder workerBuilder
- = WorkerResourceInfo.newBuilder();
-
- for(Worker worker: workers.values()) {
- WorkerResource workerResource = worker.getResource();
- workerBuilder.setAllocatedHost(worker.getHostName());
- workerBuilder.setDiskSlots(workerResource.getDiskSlots());
- workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
- workerBuilder.setMemoryMB(workerResource.getMemoryMB());
- workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
- workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
- workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
- workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
- workerBuilder.setWorkerStatus(worker.getState().toString());
- workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
- workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
- workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
- workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
- workerBuilder.setClientPort(worker.getClientPort());
- workerBuilder.setPullServerPort(worker.getPullServerPort());
- workerBuilder.setHttpPort(worker.getHttpPort());
- workerBuilder.setMaxHeap(workerResource.getMaxHeap());
- workerBuilder.setFreeHeap(workerResource.getFreeHeap());
- workerBuilder.setTotalHeap(workerResource.getTotalHeap());
- workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
- workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
-
- builder.addWorkerList(workerBuilder.build());
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder();
+
+ Map<String, Worker> workers = context.getResourceManager().getWorkers();
+
+ List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+ Collections.sort(wokerKeys);
+
+ WorkerResourceInfo.Builder workerBuilder
+ = WorkerResourceInfo.newBuilder();
+
+ for(Worker worker: workers.values()) {
+ WorkerResource workerResource = worker.getResource();
+ workerBuilder.setAllocatedHost(worker.getHostName());
+ workerBuilder.setDiskSlots(workerResource.getDiskSlots());
+ workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
+ workerBuilder.setMemoryMB(workerResource.getMemoryMB());
+ workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
+ workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
+ workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
+ workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
+ workerBuilder.setWorkerStatus(worker.getState().toString());
+ workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
+ workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
+ workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
+ workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
+ workerBuilder.setClientPort(worker.getClientPort());
+ workerBuilder.setPullServerPort(worker.getPullServerPort());
+ workerBuilder.setHttpPort(worker.getHttpPort());
+ workerBuilder.setMaxHeap(workerResource.getMaxHeap());
+ workerBuilder.setFreeHeap(workerResource.getFreeHeap());
+ workerBuilder.setTotalHeap(workerResource.getTotalHeap());
+ workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
+ workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
+
+ builder.addWorkerList(workerBuilder.build());
+ }
+
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
}
+ }
- return builder.build();
+ @Override
+ public BoolProto createDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ if (context.getGlobalEngine().createDatabase(session, request.getValue(), null, false)) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
}
@Override
- public BoolProto existTable(RpcController controller,
- StringProto tableNameProto)
- throws ServiceException {
- String tableName = tableNameProto.getValue();
- if (catalog.existsTable(tableName)) {
- return BOOL_TRUE;
- } else {
- return BOOL_FALSE;
+ public BoolProto existDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ if (catalog.existDatabase(request.getValue())) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public BoolProto dropDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ if (context.getGlobalEngine().dropDatabase(session, request.getValue(), false)) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public PrimitiveProtos.StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
+ request) throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getId());
+ return ProtoUtil.convertStrings(catalog.getAllDatabaseNames());
+ } catch (Throwable e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public BoolProto existTable(RpcController controller, SessionedStringProto request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String databaseName;
+ String tableName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getValue();
+ }
+
+ if (catalog.existsTable(databaseName, tableName)) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (Throwable e) {
+ throw new ServiceException(e);
}
}
@Override
public GetTableListResponse getTableList(RpcController controller,
- GetTableListRequest request)
- throws ServiceException {
- Collection<String> tableNames = catalog.getAllTableNames();
- GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
- builder.addAllTables(tableNames);
- return builder.build();
+ GetTableListRequest request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ String databaseName;
+ if (request.hasDatabaseName()) {
+ databaseName = request.getDatabaseName();
+ } else {
+ databaseName = session.getCurrentDatabase();
+ }
+ Collection<String> tableNames = catalog.getAllTableNames(databaseName);
+ GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
+ builder.addAllTables(tableNames);
+ return builder.build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
}
@Override
- public TableResponse getTableDesc(RpcController controller,
- GetTableDescRequest request)
- throws ServiceException {
- String name = request.getTableName();
- if (catalog.existsTable(name)) {
- return TableResponse.newBuilder()
- .setResultCode(ResultCode.OK)
- .setTableDesc(catalog.getTableDesc(name).getProto())
- .build();
- } else {
- return TableResponse.newBuilder()
- .setResultCode(ResultCode.ERROR)
- .setErrorMessage("ERROR: no such a table: " + request.getTableName())
- .build();
+ public TableResponse getTableDesc(RpcController controller, GetTableDescRequest request) throws ServiceException {
+ try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String databaseName;
+ String tableName;
+ if (CatalogUtil.isFQTableName(request.getTableName())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getTableName();
+ }
+
+ if (catalog.existsTable(databaseName, tableName)) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
+ .setTableDesc(catalog.getTableDesc(databaseName, tableName).getProto())
+ .build();
+ } else {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage("ERROR: no such a table: " + request.getTableName())
+ .build();
+ }
+ } catch (Throwable t) {
+ throw new ServiceException(t);
}
}
@@ -431,6 +677,8 @@ public class TajoMasterClientService extends AbstractService {
public TableResponse createExternalTable(RpcController controller, CreateTableRequest request)
throws ServiceException {
try {
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
Path path = new Path(request.getPath());
FileSystem fs = path.getFileSystem(conf);
@@ -447,8 +695,8 @@ public class TajoMasterClientService extends AbstractService {
TableDesc desc;
try {
- desc = context.getGlobalEngine().createTableOnPath(request.getName(), schema,
- meta, path, false, partitionDesc);
+ desc = context.getGlobalEngine().createTableOnPath(session, request.getName(), schema,
+ meta, path, true, partitionDesc, false);
} catch (Exception e) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
@@ -458,6 +706,10 @@ public class TajoMasterClientService extends AbstractService {
return TableResponse.newBuilder()
.setResultCode(ResultCode.OK)
.setTableDesc(desc.getProto()).build();
+ } catch (InvalidSessionException ise) {
+ return TableResponse.newBuilder()
+ .setResultCode(ResultCode.ERROR)
+ .setErrorMessage(ise.getMessage()).build();
} catch (IOException ioe) {
return TableResponse.newBuilder()
.setResultCode(ResultCode.ERROR)
@@ -467,30 +719,43 @@ public class TajoMasterClientService extends AbstractService {
@Override
public BoolProto dropTable(RpcController controller, DropTableRequest dropTable) throws ServiceException {
- context.getGlobalEngine().dropTable(dropTable.getName(), dropTable.getPurge());
- return BOOL_TRUE;
+ try {
+ Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId());
+ context.getGlobalEngine().dropTable(session, dropTable.getName(), false, dropTable.getPurge());
+ return BOOL_TRUE;
+ } catch (Throwable t) {
+ throw new ServiceException(t);
+ }
}
@Override
- public FunctionResponse getFunctionList(RpcController controller, StringProto request) throws ServiceException {
- String functionName = request.getValue();
- Collection<FunctionDesc> functions = catalog.getFunctions();
+ public FunctionResponse getFunctionList(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
- List<CatalogProtos.FunctionDescProto> functionProtos = new ArrayList<CatalogProtos.FunctionDescProto>();
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
- for (FunctionDesc eachFunction: functions) {
- if (functionName == null || functionName.isEmpty()) {
- functionProtos.add(eachFunction.getProto());
- } else {
- if(functionName.equals(eachFunction.getSignature())) {
+ String functionName = request.getValue();
+ Collection<FunctionDesc> functions = catalog.getFunctions();
+
+ List<CatalogProtos.FunctionDescProto> functionProtos = new ArrayList<CatalogProtos.FunctionDescProto>();
+
+ for (FunctionDesc eachFunction: functions) {
+ if (functionName == null || functionName.isEmpty()) {
functionProtos.add(eachFunction.getProto());
+ } else {
+ if(functionName.equals(eachFunction.getSignature())) {
+ functionProtos.add(eachFunction.getProto());
+ }
}
}
+ return FunctionResponse.newBuilder()
+ .setResultCode(ResultCode.OK)
+ .addAllFunctions(functionProtos)
+ .build();
+ } catch (Throwable t) {
+ throw new ServiceException(t);
}
- return FunctionResponse.newBuilder()
- .setResultCode(ResultCode.OK)
- .addAllFunctions(functionProtos)
- .build();
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
index 28b5f08..8b18b5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -185,7 +185,7 @@ public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskR
//
// @Override
// protected String getRunnerClass() {
-// return TaskRunner.class.getCanonicalName();
+// return TaskRunner.class.getCanonicalSignature();
// }
//
// @Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 2c8b822..dd996e6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -21,6 +21,7 @@ package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.tajo.QueryId;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.session.Session;
/**
* This event is conveyed to QueryMaster.
@@ -30,14 +31,17 @@ public class QueryStartEvent extends AbstractEvent {
QUERY_START
}
- private QueryId queryId;
- private QueryContext queryContext;
- private String sql;
- private String logicalPlanJson;
+ private final QueryId queryId;
+ private final Session session;
+ private final QueryContext queryContext;
+ private final String sql;
+ private final String logicalPlanJson;
- public QueryStartEvent(QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
+ public QueryStartEvent(QueryId queryId, Session session, QueryContext queryContext, String sql,
+ String logicalPlanJson) {
super(EventType.QUERY_START);
this.queryId = queryId;
+ this.session = session;
this.queryContext = queryContext;
this.sql = sql;
this.logicalPlanJson = logicalPlanJson;
@@ -47,6 +51,10 @@ public class QueryStartEvent extends AbstractEvent {
return queryId;
}
+ public Session getSession() {
+ return this.session;
+ }
+
public QueryContext getQueryContext() {
return this.queryContext;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
index 08fff53..7c3d283 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
@@ -26,6 +26,8 @@ import org.apache.tajo.master.TajoMaster;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
public class CatalogMetricsGaugeSet implements MetricSet {
TajoMaster.MasterContext tajoMasterContext;
public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
@@ -38,7 +40,7 @@ public class CatalogMetricsGaugeSet implements MetricSet {
metricsMap.put("numTables", new Gauge<Integer>() {
@Override
public Integer getValue() {
- return tajoMasterContext.getCatalog().getAllTableNames().size();
+ return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size();
}
});
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 02ed34e..8467b4b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -37,12 +37,12 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.logical.InsertNode;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.InsertNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
@@ -134,6 +134,20 @@ public class Query implements EventHandler<QueryEvent> {
QueryEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Transitions from QUERY_SUCCEEDED state
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ // ignore-able transitions
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+ QueryEventType.KILL)
+ .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+
// Transitions from KILL_WAIT state
.addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
QueryEventType.SUBQUERY_COMPLETED,
@@ -433,8 +447,8 @@ public class Query implements EventHandler<QueryEvent> {
}
@Override
- public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
- ExecutionBlockId finalExecBlockId,
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId,
Path finalOutputDir) throws Exception {
SubQuery lastStage = query.getSubQuery(finalExecBlockId);
TableMeta meta = lastStage.getTableMeta();
@@ -446,6 +460,7 @@ public class Query implements EventHandler<QueryEvent> {
lastStage.getSchema(),
meta,
finalOutputDir);
+ resultTableDesc.setExternal(true);
stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
resultTableDesc.setStats(stats);
@@ -463,9 +478,8 @@ public class Query implements EventHandler<QueryEvent> {
}
@Override
- public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
- ExecutionBlockId finalExecBlockId,
- Path finalOutputDir) throws Exception {
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
CatalogService catalog = context.getWorkerContext().getCatalog();
SubQuery lastStage = query.getSubQuery(finalExecBlockId);
TableMeta meta = lastStage.getTableMeta();
@@ -479,6 +493,7 @@ public class Query implements EventHandler<QueryEvent> {
createTableNode.getTableSchema(),
meta,
finalOutputDir);
+ tableDescTobeCreated.setExternal(createTableNode.isExternal());
if (createTableNode.hasPartition()) {
tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
@@ -488,7 +503,7 @@ public class Query implements EventHandler<QueryEvent> {
tableDescTobeCreated.setStats(stats);
query.setResultDesc(tableDescTobeCreated);
- catalog.addTable(tableDescTobeCreated);
+ catalog.createTable(tableDescTobeCreated);
}
}
@@ -502,9 +517,8 @@ public class Query implements EventHandler<QueryEvent> {
}
@Override
- public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
- ExecutionBlockId finalExecBlockId,
- Path finalOutputDir)
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+ Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
throws Exception {
CatalogService catalog = context.getWorkerContext().getCatalog();
@@ -528,8 +542,8 @@ public class Query implements EventHandler<QueryEvent> {
finalTable.setStats(stats);
if (insertNode.hasTargetTable()) {
- catalog.deleteTable(insertNode.getTableName());
- catalog.addTable(finalTable);
+ catalog.dropTable(insertNode.getTableName());
+ catalog.createTable(finalTable);
}
query.setResultDesc(finalTable);
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index efb9c06..b8f312e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -37,6 +37,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
@@ -52,6 +53,8 @@ public class QueryInProgress extends CompositeService {
private QueryId queryId;
+ private Session session;
+
private QueryContext queryContext;
private TajoAsyncDispatcher dispatcher;
@@ -74,10 +77,12 @@ public class QueryInProgress extends CompositeService {
public QueryInProgress(
TajoMaster.MasterContext masterContext,
+ Session session,
QueryContext queryContext,
QueryId queryId, String sql, LogicalRootNode plan) {
super(QueryInProgress.class.getName());
this.masterContext = masterContext;
+ this.session = session;
this.queryContext = queryContext;
this.queryId = queryId;
this.plan = plan;
@@ -229,6 +234,7 @@ public class QueryInProgress extends CompositeService {
null,
TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
.setQueryId(queryId.getProto())
+ .setSession(session.getProto())
.setQueryContext(queryContext.getProto())
.setSql(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getSql()))
.setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 6cfb600..aa03501 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -30,6 +30,7 @@ import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.session.Session;
import java.util.Collection;
import java.util.Collections;
@@ -94,9 +95,10 @@ public class QueryJobManager extends CompositeService {
return Collections.unmodifiableCollection(finishedQueries.values());
}
- public QueryInfo createNewQueryJob(QueryContext queryContext, String sql, LogicalRootNode plan) throws Exception {
+ public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql, LogicalRootNode plan)
+ throws Exception {
QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryContext, queryId, sql, plan);
+ QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, plan);
synchronized(runningQueries) {
runningQueries.put(queryId, queryInProgress);
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 9dddbf7..abdc214 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -371,7 +371,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
public void handle(QueryStartEvent event) {
LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
- event.getQueryId(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
+ event.getQueryId(), event.getSession(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
queryMasterTask.init(systemConf);
queryMasterTask.start();
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index e3e0260..bf59e9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -33,6 +33,7 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.LazyTaskScheduler;
import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.util.NetUtils;
@@ -225,6 +226,7 @@ public class QueryMasterManagerService extends CompositeService
QueryId queryId = new QueryId(request.getQueryId());
LOG.info("Receive executeQuery request:" + queryId);
queryMaster.handle(new QueryStartEvent(queryId,
+ new Session(request.getSession()),
new QueryContext(request.getQueryContext()), request.getSql().getValue(),
request.getLogicalPlanJson().getValue()));
done.run(TajoWorker.TRUE_PROTO);
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 7719a85..39e2177 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -48,6 +48,7 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.session.Session;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
@@ -76,6 +77,8 @@ public class QueryMasterTask extends CompositeService {
private QueryId queryId;
+ private Session session;
+
private QueryContext queryContext;
private QueryMasterTaskContext queryTaskContext;
@@ -107,10 +110,13 @@ public class QueryMasterTask extends CompositeService {
private TajoMetrics queryMetrics;
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
+ QueryId queryId, Session session, QueryContext queryContext, String sql,
+ String logicalPlanJson) {
+
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
this.queryId = queryId;
+ this.session = session;
this.queryContext = queryContext;
this.sql = sql;
this.logicalPlanJson = logicalPlanJson;
@@ -306,7 +312,7 @@ public class QueryMasterTask extends CompositeService {
}
LogicalPlan plan = null;
try {
- plan = planner.createPlan(expr);
+ plan = planner.createPlan(session, expr);
optimizer.optimize(plan);
} catch (PlanningException e) {
e.printStackTrace();
@@ -400,7 +406,7 @@ public class QueryMasterTask extends CompositeService {
// Create a subdirectories
defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
- LOG.info("The staging dir '" + outputDir + "' is created.");
+ LOG.info("The staging dir '" + stagingDir + "' is created.");
queryContext.setStagingDir(stagingDir);
/////////////////////////////////////////////////
@@ -481,6 +487,10 @@ public class QueryMasterTask extends CompositeService {
return queryMasterContext;
}
+ public Session getSession() {
+ return session;
+ }
+
public QueryContext getQueryContext() {
return queryContext;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 1bcf38b..4bd7adb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -30,7 +30,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
import org.apache.tajo.rpc.AsyncRpcServer;
import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.ProtoBufUtil;
+import org.apache.tajo.util.ProtoUtil;
import java.io.IOError;
import java.net.InetSocketAddress;
@@ -109,7 +109,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
}
/** The response builder */
- private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoBufUtil.TRUE);
+ private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
return new WorkerStatusEvent(
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
new file mode 100644
index 0000000..3f48ca5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class InvalidSessionException extends Exception {
+ public InvalidSessionException(String sessionId) {
+ super("Invalid session id \"" + sessionId + "\"");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..686d860
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class NoSuchSessionVariableException extends Exception {
+ public NoSuchSessionVariableException(String varname) {
+ super("No such session variable \"" + varname + "\"");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java
new file mode 100644
index 0000000..4d244bf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
+
+public class Session implements SessionConstants, ProtoObject<SessionProto> {
+ private final String sessionId;
+ private final String userName;
+ private final Map<String, String> sessionVariables;
+
+ // transient status
+ private volatile long lastAccessTime;
+ private volatile String currentDatabase;
+
+ public Session(String sessionId, String userName, String databaseName) {
+ this.sessionId = sessionId;
+ this.userName = userName;
+ this.lastAccessTime = System.currentTimeMillis();
+ this.sessionVariables = new HashMap<String, String>();
+ selectDatabase(databaseName);
+ }
+
+ public Session(SessionProto proto) {
+ sessionId = proto.getSessionId();
+ userName = proto.getUsername();
+ currentDatabase = proto.getCurrentDatabase();
+ lastAccessTime = proto.getLastAccessTime();
+ Options options = new Options(proto.getVariables());
+ sessionVariables = options.getAllKeyValus();
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void updateLastAccessTime() {
+ lastAccessTime = System.currentTimeMillis();
+ }
+
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public void setVariable(String name, String value) {
+ synchronized (sessionVariables) {
+ sessionVariables.put(name, value);
+ }
+ }
+
+ public String getVariable(String name) throws NoSuchSessionVariableException {
+ synchronized (sessionVariables) {
+ if (sessionVariables.containsKey(name)) {
+ return sessionVariables.get(name);
+ } else {
+ throw new NoSuchSessionVariableException(name);
+ }
+ }
+ }
+
+ public void removeVariable(String name) {
+ synchronized (sessionVariables) {
+ sessionVariables.remove(name);
+ }
+ }
+
+ public synchronized Map<String, String> getAllVariables() {
+ synchronized (sessionVariables) {
+ return ImmutableMap.copyOf(sessionVariables);
+ }
+ }
+
+ public void selectDatabase(String databaseName) {
+ this.currentDatabase = databaseName;
+ }
+
+ public String getCurrentDatabase() {
+ return this.currentDatabase;
+ }
+
+ @Override
+ public SessionProto getProto() {
+ SessionProto.Builder builder = SessionProto.newBuilder();
+ builder.setSessionId(sessionId);
+ builder.setUsername(userName);
+ builder.setCurrentDatabase(currentDatabase);
+ builder.setLastAccessTime(lastAccessTime);
+ Options variables = new Options();
+ variables.putAll(this.sessionVariables);
+ builder.setVariables(variables.getProto());
+ return builder.build();
+ }
+
+ public String toString() {
+ return "user=" + userName + ",id=" + sessionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java
new file mode 100644
index 0000000..46f49a2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public interface SessionConstants {
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java
new file mode 100644
index 0000000..dce3ba6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SessionEvent extends AbstractEvent<SessionEventType> {
+ private final String sessionId;
+
+ public SessionEvent(String sessionId, SessionEventType sessionEventType) {
+ super(sessionEventType);
+ this.sessionId = sessionId;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java
new file mode 100644
index 0000000..64c6fc6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public enum SessionEventType {
+ EXPIRE,
+ PING
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
new file mode 100644
index 0000000..483920f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+ private EventHandler dispatcher;
+
+ public SessionLivelinessMonitor(Dispatcher d) {
+ super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
+ this.dispatcher = d.getEventHandler();
+ }
+
+ public void serviceInit(Configuration conf) throws Exception {
+ Preconditions.checkArgument(conf instanceof TajoConf);
+ TajoConf systemConf = (TajoConf) conf;
+
+ // seconds
+ int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.CLIENT_SESSION_EXPIRY_TIME);
+ setExpireInterval(expireIntvl);
+ setMonitorInterval(expireIntvl / 3);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void expire(String id) {
+ dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java
new file mode 100644
index 0000000..24df9d8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
+ private static final Log LOG = LogFactory.getLog(SessionManager.class);
+
+ public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
+ private final Dispatcher dispatcher;
+ private SessionLivelinessMonitor sessionLivelinessMonitor;
+
+
+ public SessionManager(Dispatcher dispatcher) {
+ super(SessionManager.class.getSimpleName());
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
+ addIfService(sessionLivelinessMonitor);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
+ private void assertSessionExistence(String sessionId) throws InvalidSessionException {
+ if (!sessions.containsKey(sessionId)) {
+ throw new InvalidSessionException(sessionId);
+ }
+ }
+
+ public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
+ String sessionId;
+ Session oldSession;
+
+ sessionId = UUID.randomUUID().toString();
+ Session newSession = new Session(sessionId, username, baseDatabaseName);
+ oldSession = sessions.putIfAbsent(sessionId, newSession);
+ if (oldSession != null) {
+ throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
+ }
+ LOG.info("Session " + sessionId + " is created." );
+ return sessionId;
+ }
+
+ public void removeSession(String sessionId) {
+ if (sessions.containsKey(sessionId)) {
+ sessions.remove(sessionId);
+ LOG.info("Session " + sessionId + " is removed.");
+ } else {
+ LOG.error("No such session id: " + sessionId);
+ }
+ }
+
+ public Session getSession(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId);
+ }
+
+ public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ sessions.get(sessionId).setVariable(name, value);
+ }
+
+ public String getVariable(String sessionId, String name)
+ throws InvalidSessionException, NoSuchSessionVariableException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId).getVariable(name);
+ }
+
+ public void removeVariable(String sessionId, String name) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ sessions.get(sessionId).removeVariable(name);
+ }
+
+ public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ touch(sessionId);
+ return sessions.get(sessionId).getAllVariables();
+ }
+
+ public void touch(String sessionId) throws InvalidSessionException {
+ assertSessionExistence(sessionId);
+ sessions.get(sessionId).updateLastAccessTime();
+ sessionLivelinessMonitor.receivedPing(sessionId);
+ }
+
+ @Override
+ public void handle(SessionEvent event) {
+ LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
+
+ try {
+ assertSessionExistence(event.getSessionId());
+ touch(event.getSessionId());
+ } catch (InvalidSessionException e) {
+ LOG.error(e);
+ }
+
+ if (event.getType() == SessionEventType.EXPIRE) {
+ Session session = sessions.remove(event.getSessionId());
+ LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 10e8ec2..7c94e33 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -28,6 +28,14 @@ import "TajoIdProtos.proto";
import "CatalogProtos.proto";
import "PrimitiveProtos.proto";
+message SessionProto {
+ required string session_id = 1;
+ required string username = 2;
+ required string current_database = 3;
+ required int64 last_access_time = 4;
+ required KeyValueSetProto variables = 5;
+}
+
message TaskStatusProto {
required QueryUnitAttemptIdProto id = 1;
required string workerName = 2;
@@ -109,9 +117,10 @@ message ShuffleFileOutput {
message QueryExecutionRequestProto {
required QueryIdProto queryId = 1;
- required KeyValueSetProto queryContext = 2;
- required StringProto sql = 3;
- optional StringProto logicalPlanJson = 4;
+ required SessionProto session = 2;
+ required KeyValueSetProto queryContext = 3;
+ required StringProto sql = 4;
+ optional StringProto logicalPlanJson = 5;
}
message GetTaskRequestProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
index 397146a..29fd05c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
@@ -49,13 +49,13 @@
TableDesc tableDesc = null;
String selectedTable = request.getParameter("table");
if(selectedTable != null && !selectedTable.trim().isEmpty()) {
- tableDesc = catalog.getTableDesc(selectedTable);
+ tableDesc = catalog.getTableDesc(selectedDatabase, selectedTable);
} else {
selectedTable = "";
}
//TODO filter with database
- Collection<String> tableNames = catalog.getAllTableNames();
+ Collection<String> tableNames = catalog.getAllTableNames(selectedDatabase);
%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
@@ -83,8 +83,16 @@
<td width="20%" valign="top">
<div>
<b>Database:</b>
- <select width="190" style="width: 190px">
- <option value="default" selected>default</option>
+ <select width="190" style="width: 190px" onchange="document.location.href='catalogview.jsp?database=' + this.value">
+ <%
+ for (String databaseName : catalog.getAllDatabaseNames()) {
+ if (selectedDatabase.equals(databaseName)) { %>
+ <option value="<%=databaseName%>" selected><%=databaseName%>
+ <%} else {%>
+ <option value="<%=databaseName%>"><%=databaseName%></option>
+ <%}
+ }
+ %>
</select>
</div>
<!-- table list -->
http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 144ca1b..e651313 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -30,10 +31,12 @@ import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.master.session.Session;
import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
import java.sql.ResultSet;
+import java.util.UUID;
public class LocalTajoTestingUtility {
private static final Log LOG = LogFactory.getLog(LocalTajoTestingUtility.class);
@@ -42,6 +45,16 @@ public class LocalTajoTestingUtility {
private TajoConf conf;
private TajoClient client;
+ private static UserGroupInformation dummyUserInfo;
+
+ static {
+ try {
+ dummyUserInfo = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
private static int taskAttemptId;
public static QueryUnitAttemptId newQueryUnitAttemptId() {
@@ -51,6 +64,9 @@ public class LocalTajoTestingUtility {
public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);
}
+ public static Session createDummySession() {
+ return new Session(UUID.randomUUID().toString(), dummyUserInfo.getUserName(), TajoConstants.DEFAULT_DATABASE_NAME);
+ }
/**
* for test
@@ -88,9 +104,11 @@ public class LocalTajoTestingUtility {
// It gives more various situations to unit tests.
TableStats stats = new TableStats();
stats.setNumBytes(TPCH.tableVolumes.get(names[i]));
- TableDesc tableDesc = new TableDesc(names[i], schemas[i], meta, tablePath);
+ TableDesc tableDesc = new TableDesc(
+ CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, names[i]), schemas[i], meta,
+ tablePath);
tableDesc.setStats(stats);
- util.getMaster().getCatalog().addTable(tableDesc);
+ util.getMaster().getCatalog().createTable(tableDesc);
}
LOG.info("===================================================");