You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/25 02:36:21 UTC

[06/13] TAJO-353: Add Database support to Tajo. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index dc70f23..a504e7b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -27,11 +27,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.TajoProtos;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
@@ -47,15 +45,22 @@ import org.apache.tajo.master.querymaster.QueryJobEvent;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.Worker;
 import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.session.InvalidSessionException;
+import org.apache.tajo.master.session.NoSuchSessionVariableException;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.BlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoUtil;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.*;
 
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
 public class TajoMasterClientService extends AbstractService {
   private final static Log LOG = LogFactory.getLog(TajoMasterClientService.class);
   private final MasterContext context;
@@ -111,32 +116,150 @@ public class TajoMasterClientService extends AbstractService {
     return this.bindAddress;
   }
 
-  public int getHttpPort() {
-    return 0;
-  }
   /////////////////////////////////////////////////////////////////////////////
   // TajoMasterClientProtocolService
   /////////////////////////////////////////////////////////////////////////////
   public class TajoMasterClientProtocolServiceHandler implements TajoMasterClientProtocolService.BlockingInterface {
     @Override
-    public BoolProto updateSessionVariables(RpcController controller,
-                                            UpdateSessionVariableRequest request)
+    public CreateSessionResponse createSession(RpcController controller, CreateSessionRequest request)
         throws ServiceException {
-      return null;
+      try {
+        // create a new session with base database name. If no database name is give, we use default database.
+        String databaseName = request.hasBaseDatabaseName() ? request.getBaseDatabaseName() : DEFAULT_DATABASE_NAME;
+
+        if (!context.getCatalog().existDatabase(databaseName)) {
+          LOG.info("Session creation is canceled due to absent base database \"" + databaseName + "\".");
+          throw new NoSuchDatabaseException(databaseName);
+        }
+
+        String sessionId =
+            context.getSessionManager().createSession(request.getUsername(), databaseName);
+        CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+        builder.setState(CreateSessionResponse.ResultState.SUCCESS);
+        builder.setSessionId(TajoIdProtos.SessionIdProto.newBuilder().setId(sessionId).build());
+        return builder.build();
+      } catch (NoSuchDatabaseException nsde) {
+        CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+        builder.setState(CreateSessionResponse.ResultState.FAILED);
+        builder.setMessage(nsde.getMessage());
+        return builder.build();
+      } catch (InvalidSessionException e) {
+        CreateSessionResponse.Builder builder = CreateSessionResponse.newBuilder();
+        builder.setState(CreateSessionResponse.ResultState.FAILED);
+        builder.setMessage(e.getMessage());
+        return builder.build();
+      }
     }
 
     @Override
-    public ExplainQueryResponse explainQuery(RpcController controller,
-                                           ExplainQueryRequest request)
+    public BoolProto removeSession(RpcController controller, TajoIdProtos.SessionIdProto request)
+        throws ServiceException {
+      if (request != null) {
+        context.getSessionManager().removeSession(request.getId());
+      }
+      return ProtoUtil.TRUE;
+    }
+
+    @Override
+    public BoolProto updateSessionVariables(RpcController controller, UpdateSessionVariableRequest request)
+        throws ServiceException {
+      try {
+        String sessionId = request.getSessionId().getId();
+        for (CatalogProtos.KeyValueProto kv : request.getSetVariables().getKeyvalList()) {
+          context.getSessionManager().setVariable(sessionId, kv.getKey(), kv.getValue());
+        }
+        for (String unsetVariable : request.getUnsetVariablesList()) {
+          context.getSessionManager().removeVariable(sessionId, unsetVariable);
+        }
+        return ProtoUtil.TRUE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public StringProto getSessionVariable(RpcController controller, SessionedStringProto request)
         throws ServiceException {
 
       try {
+        return ProtoUtil.convertString(
+            context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue()));
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto existSessionVariable(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        String value = context.getSessionManager().getVariable(request.getSessionId().getId(), request.getValue());
+        if (value != null) {
+          return ProtoUtil.TRUE;
+        } else {
+          return ProtoUtil.FALSE;
+        }
+      } catch (NoSuchSessionVariableException nssv) {
+        return ProtoUtil.FALSE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public CatalogProtos.KeyValueSetProto getAllSessionVariables(RpcController controller,
+                                                                 TajoIdProtos.SessionIdProto request)
+        throws ServiceException {
+      try {
+        String sessionId = request.getId();
+        Options options = new Options();
+        options.putAll(context.getSessionManager().getAllVariables(sessionId));
+        return options.getProto();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public StringProto getCurrentDatabase(RpcController controller, TajoIdProtos.SessionIdProto request)
+        throws ServiceException {
+      try {
+        String sessionId = request.getId();
+        return ProtoUtil.convertString(context.getSessionManager().getSession(sessionId).getCurrentDatabase());
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public BoolProto selectDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        String sessionId = request.getSessionId().getId();
+        String databaseName = CatalogUtil.normalizeIdentifier(request.getValue());
+
+        if (context.getCatalog().existDatabase(databaseName)) {
+          context.getSessionManager().getSession(sessionId).selectDatabase(databaseName);
+          return ProtoUtil.TRUE;
+        } else {
+          throw new ServiceException(new NoSuchDatabaseException(databaseName));
+        }
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public ExplainQueryResponse explainQuery(RpcController controller,
+                                           SessionedStringProto request) throws ServiceException {
+
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
         if(LOG.isDebugEnabled()) {
-          LOG.debug("ExplainQuery [" + request.getQuery() + "]");
+          LOG.debug("ExplainQuery [" + request.getValue() + "]");
         }
         ClientProtos.ExplainQueryResponse.Builder responseBuilder = ClientProtos.ExplainQueryResponse.newBuilder();
         responseBuilder.setResultCode(ResultCode.OK);
-        String plan = context.getGlobalEngine().explainQuery(request.getQuery());
+        String plan = context.getGlobalEngine().explainQuery(session, request.getValue());
         if(LOG.isDebugEnabled()) {
           LOG.debug("ExplainQuery [" + plan + "]");
         }
@@ -151,15 +274,16 @@ public class TajoMasterClientService extends AbstractService {
       }
     }
     @Override
-    public GetQueryStatusResponse submitQuery(RpcController controller,
-                                           QueryRequest request)
-        throws ServiceException {
+    public GetQueryStatusResponse submitQuery(RpcController controller, QueryRequest request) throws ServiceException {
+
 
       try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
         if(LOG.isDebugEnabled()) {
           LOG.debug("Query [" + request.getQuery() + "] is submitted");
         }
-        return context.getGlobalEngine().executeQuery(request.getQuery());
+        return context.getGlobalEngine().executeQuery(session, request.getQuery());
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         ClientProtos.GetQueryStatusResponse.Builder responseBuilder = ClientProtos.GetQueryStatusResponse.newBuilder();
@@ -174,119 +298,135 @@ public class TajoMasterClientService extends AbstractService {
     }
 
     @Override
-    public UpdateQueryResponse updateQuery(RpcController controller,
-                                           QueryRequest request)
-        throws ServiceException {
+    public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest request) throws ServiceException {
 
-      UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
       try {
-        context.getGlobalEngine().updateQuery(request.getQuery());
-        builder.setResultCode(ResultCode.OK);
-        return builder.build();
-      } catch (Exception e) {
-        builder.setResultCode(ResultCode.ERROR);
-        if (e.getMessage() == null) {
-          builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        UpdateQueryResponse.Builder builder = UpdateQueryResponse.newBuilder();
+        try {
+          context.getGlobalEngine().updateQuery(session, request.getQuery());
+          builder.setResultCode(ResultCode.OK);
+          return builder.build();
+        } catch (Exception e) {
+          builder.setResultCode(ResultCode.ERROR);
+          if (e.getMessage() == null) {
+            builder.setErrorMessage(ExceptionUtils.getStackTrace(e));
+          }
+          return builder.build();
         }
-        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
       }
     }
 
     @Override
     public GetQueryResultResponse getQueryResult(RpcController controller,
-                                                 GetQueryResultRequest request)
-        throws ServiceException {
-      QueryId queryId = new QueryId(request.getQueryId());
-      QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
-      QueryInfo queryInfo = queryInProgress.getQueryInfo();
-      GetQueryResultResponse.Builder builder
-          = GetQueryResultResponse.newBuilder();
-
+                                                 GetQueryResultRequest request) throws ServiceException {
       try {
-        //TODO After implementation Tajo's user security feature, Should be modified.
-        builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
-      } catch (IOException e) {
-        LOG.warn("Can't get current user name");
-      }
-      switch (queryInfo.getQueryState()) {
-        case QUERY_SUCCEEDED:
-          // TODO check this logic needed
-          //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
-          break;
-        case QUERY_FAILED:
-        case QUERY_ERROR:
-          builder.setErrorMessage("Query " + queryId + " is failed");
-        default:
-          builder.setErrorMessage("Query " + queryId + " is still running");
-      }
+        context.getSessionManager().touch(request.getSessionId().getId());
+        QueryId queryId = new QueryId(request.getQueryId());
+        QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+        QueryInfo queryInfo = queryInProgress.getQueryInfo();
+        GetQueryResultResponse.Builder builder
+            = GetQueryResultResponse.newBuilder();
+
+        try {
+          //TODO After implementation Tajo's user security feature, Should be modified.
+          builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName());
+        } catch (IOException e) {
+          LOG.warn("Can't get current user name");
+        }
+        switch (queryInfo.getQueryState()) {
+          case QUERY_SUCCEEDED:
+            // TODO check this logic needed
+            //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
+            break;
+          case QUERY_FAILED:
+          case QUERY_ERROR:
+            builder.setErrorMessage("Query " + queryId + " is failed");
+          default:
+            builder.setErrorMessage("Query " + queryId + " is still running");
+        }
 
-      return builder.build();
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
     }
 
     @Override
-    public GetQueryListResponse getRunningQueryList(RpcController controller,
-                                             GetQueryListRequest request)
+    public GetQueryListResponse getRunningQueryList(RpcController controller, GetQueryListRequest request)
+
         throws ServiceException {
-      GetQueryListResponse.Builder builder
-        = GetQueryListResponse.newBuilder(); 
-       
-      Collection<QueryInProgress> queries 
-        = context.getQueryJobManager().getRunningQueries();
-
-      BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
- 
-      for (QueryInProgress queryInProgress : queries) {
-        QueryInfo queryInfo = queryInProgress.getQueryInfo();
 
-        infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
-        infoBuilder.setState(queryInfo.getQueryState());
-        infoBuilder.setQuery(queryInfo.getSql());
-        infoBuilder.setStartTime(queryInfo.getStartTime());
-        long endTime = (queryInfo.getFinishTime() == 0) ? 
-                       System.currentTimeMillis() : queryInfo.getFinishTime();
-        infoBuilder.setFinishTime(endTime);
-        infoBuilder.setProgress(queryInfo.getProgress());
-        infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
-        infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
-
-        builder.addQueryList(infoBuilder.build());
-      }
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        GetQueryListResponse.Builder builder= GetQueryListResponse.newBuilder();
 
-      GetQueryListResponse result = builder.build();
-      return result;
+        Collection<QueryInProgress> queries
+          = context.getQueryJobManager().getRunningQueries();
+
+        BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+
+        for (QueryInProgress queryInProgress : queries) {
+          QueryInfo queryInfo = queryInProgress.getQueryInfo();
+
+          infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+          infoBuilder.setState(queryInfo.getQueryState());
+          infoBuilder.setQuery(queryInfo.getSql());
+          infoBuilder.setStartTime(queryInfo.getStartTime());
+          long endTime = (queryInfo.getFinishTime() == 0) ?
+                         System.currentTimeMillis() : queryInfo.getFinishTime();
+          infoBuilder.setFinishTime(endTime);
+          infoBuilder.setProgress(queryInfo.getProgress());
+          infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+          infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+          builder.addQueryList(infoBuilder.build());
+        }
+
+        GetQueryListResponse result = builder.build();
+        return result;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
     }
 
     @Override
-    public GetQueryListResponse getFinishedQueryList(RpcController controller,
-                                             GetQueryListRequest request)
+    public GetQueryListResponse getFinishedQueryList(RpcController controller, GetQueryListRequest request)
         throws ServiceException {
-      GetQueryListResponse.Builder builder
-          = GetQueryListResponse.newBuilder();
 
-      Collection<QueryInProgress> queries
-          = context.getQueryJobManager().getFinishedQueries();
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        GetQueryListResponse.Builder builder = GetQueryListResponse.newBuilder();
 
-      BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
+        Collection<QueryInProgress> queries
+            = context.getQueryJobManager().getFinishedQueries();
 
-      for (QueryInProgress queryInProgress : queries) {
-        QueryInfo queryInfo = queryInProgress.getQueryInfo();
+        BriefQueryInfo.Builder infoBuilder = BriefQueryInfo.newBuilder();
 
-        infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
-        infoBuilder.setState(queryInfo.getQueryState());
-        infoBuilder.setQuery(queryInfo.getSql());
-        infoBuilder.setStartTime(queryInfo.getStartTime());
-        long endTime = (queryInfo.getFinishTime() == 0) ?
-            System.currentTimeMillis() : queryInfo.getFinishTime();
-        infoBuilder.setFinishTime(endTime);
-        infoBuilder.setProgress(queryInfo.getProgress());
-        infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
-        infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
-
-        builder.addQueryList(infoBuilder.build());
-      }
+        for (QueryInProgress queryInProgress : queries) {
+          QueryInfo queryInfo = queryInProgress.getQueryInfo();
 
-      GetQueryListResponse result = builder.build();
-      return result;
+          infoBuilder.setQueryId(queryInfo.getQueryId().getProto());
+          infoBuilder.setState(queryInfo.getQueryState());
+          infoBuilder.setQuery(queryInfo.getSql());
+          infoBuilder.setStartTime(queryInfo.getStartTime());
+          long endTime = (queryInfo.getFinishTime() == 0) ?
+              System.currentTimeMillis() : queryInfo.getFinishTime();
+          infoBuilder.setFinishTime(endTime);
+          infoBuilder.setProgress(queryInfo.getProgress());
+          infoBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort());
+          infoBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+
+          builder.addQueryList(infoBuilder.build());
+        }
+
+        GetQueryListResponse result = builder.build();
+        return result;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
     }
 
     @Override
@@ -294,136 +434,242 @@ public class TajoMasterClientService extends AbstractService {
                                                  GetQueryStatusRequest request)
         throws ServiceException {
 
-      GetQueryStatusResponse.Builder builder
-          = GetQueryStatusResponse.newBuilder();
-      QueryId queryId = new QueryId(request.getQueryId());
-      builder.setQueryId(request.getQueryId());
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
 
-      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
-        builder.setResultCode(ResultCode.OK);
-        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
-      } else {
-        QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
-        if (queryInProgress != null) {
-          QueryInfo queryInfo = queryInProgress.getQueryInfo();
+        GetQueryStatusResponse.Builder builder = GetQueryStatusResponse.newBuilder();
+        QueryId queryId = new QueryId(request.getQueryId());
+        builder.setQueryId(request.getQueryId());
+
+        if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
           builder.setResultCode(ResultCode.OK);
-          builder.setState(queryInfo.getQueryState());
-          builder.setProgress(queryInfo.getProgress());
-          builder.setSubmitTime(queryInfo.getStartTime());
-          if(queryInfo.getQueryMasterHost() != null) {
-            builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
-            builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
-          }
-          //builder.setInitTime(queryJobManager.getInitializationTime());
-          //builder.setHasResult(!queryJobManager.isCreateTableStmt());
-          if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(queryInfo.getFinishTime());
+          builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+        } else {
+          QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+          if (queryInProgress != null) {
+            QueryInfo queryInfo = queryInProgress.getQueryInfo();
+            builder.setResultCode(ResultCode.OK);
+            builder.setState(queryInfo.getQueryState());
+            builder.setProgress(queryInfo.getProgress());
+            builder.setSubmitTime(queryInfo.getStartTime());
+            if(queryInfo.getQueryMasterHost() != null) {
+              builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+              builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+            }
+            //builder.setInitTime(queryJobManager.getInitializationTime());
+            //builder.setHasResult(!queryJobManager.isCreateTableStmt());
+            if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+              builder.setFinishTime(queryInfo.getFinishTime());
+            } else {
+              builder.setFinishTime(System.currentTimeMillis());
+            }
           } else {
-            builder.setFinishTime(System.currentTimeMillis());
+            builder.setResultCode(ResultCode.ERROR);
+            builder.setErrorMessage("No such query: " + queryId.toString());
           }
-        } else {
-          builder.setResultCode(ResultCode.ERROR);
-          builder.setErrorMessage("No such query: " + queryId.toString());
         }
-      }
+        return builder.build();
 
-      return builder.build();
+      } catch (Throwable t) {
+        throw new  ServiceException(t);
+      }
     }
 
     /**
      * It is invoked by TajoContainerProxy.
      */
     @Override
-    public BoolProto killQuery(RpcController controller, TajoIdProtos.QueryIdProto request) throws ServiceException {
-      QueryId queryId = new QueryId(request);
-      QueryJobManager queryJobManager = context.getQueryJobManager();
-      queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
-          new QueryInfo(queryId)));
-      return BOOL_TRUE;
+    public BoolProto killQuery(RpcController controller, KillQueryRequest request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        QueryId queryId = new QueryId(request.getQueryId());
+        QueryJobManager queryJobManager = context.getQueryJobManager();
+        queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+            new QueryInfo(queryId)));
+        return BOOL_TRUE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
     }
 
     @Override
     public GetClusterInfoResponse getClusterInfo(RpcController controller,
                                                  GetClusterInfoRequest request)
         throws ServiceException {
-      GetClusterInfoResponse.Builder builder
-        = GetClusterInfoResponse.newBuilder(); 
-       
-      Map<String, Worker> workers = context.getResourceManager().getWorkers();
-
-      List<String> wokerKeys = new ArrayList<String>(workers.keySet());
-      Collections.sort(wokerKeys);
-
-      WorkerResourceInfo.Builder workerBuilder
-        = WorkerResourceInfo.newBuilder();
-
-      for(Worker worker: workers.values()) {
-        WorkerResource workerResource = worker.getResource();
-        workerBuilder.setAllocatedHost(worker.getHostName());
-        workerBuilder.setDiskSlots(workerResource.getDiskSlots());
-        workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
-        workerBuilder.setMemoryMB(workerResource.getMemoryMB());
-        workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
-        workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
-        workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
-        workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
-        workerBuilder.setWorkerStatus(worker.getState().toString());
-        workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
-        workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
-        workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
-        workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
-        workerBuilder.setClientPort(worker.getClientPort());
-        workerBuilder.setPullServerPort(worker.getPullServerPort());
-        workerBuilder.setHttpPort(worker.getHttpPort());
-        workerBuilder.setMaxHeap(workerResource.getMaxHeap());
-        workerBuilder.setFreeHeap(workerResource.getFreeHeap());
-        workerBuilder.setTotalHeap(workerResource.getTotalHeap());
-        workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
-        workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
-
-        builder.addWorkerList(workerBuilder.build());
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        GetClusterInfoResponse.Builder builder= GetClusterInfoResponse.newBuilder();
+
+        Map<String, Worker> workers = context.getResourceManager().getWorkers();
+
+        List<String> wokerKeys = new ArrayList<String>(workers.keySet());
+        Collections.sort(wokerKeys);
+
+        WorkerResourceInfo.Builder workerBuilder
+          = WorkerResourceInfo.newBuilder();
+
+        for(Worker worker: workers.values()) {
+          WorkerResource workerResource = worker.getResource();
+          workerBuilder.setAllocatedHost(worker.getHostName());
+          workerBuilder.setDiskSlots(workerResource.getDiskSlots());
+          workerBuilder.setCpuCoreSlots(workerResource.getCpuCoreSlots());
+          workerBuilder.setMemoryMB(workerResource.getMemoryMB());
+          workerBuilder.setLastHeartbeat(worker.getLastHeartbeatTime());
+          workerBuilder.setUsedMemoryMB(workerResource.getUsedMemoryMB());
+          workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
+          workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
+          workerBuilder.setWorkerStatus(worker.getState().toString());
+          workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
+          workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
+          workerBuilder.setPeerRpcPort(worker.getPeerRpcPort());
+          workerBuilder.setQueryMasterPort(worker.getQueryMasterPort());
+          workerBuilder.setClientPort(worker.getClientPort());
+          workerBuilder.setPullServerPort(worker.getPullServerPort());
+          workerBuilder.setHttpPort(worker.getHttpPort());
+          workerBuilder.setMaxHeap(workerResource.getMaxHeap());
+          workerBuilder.setFreeHeap(workerResource.getFreeHeap());
+          workerBuilder.setTotalHeap(workerResource.getTotalHeap());
+          workerBuilder.setNumRunningTasks(workerResource.getNumRunningTasks());
+          workerBuilder.setNumQueryMasterTasks(workerResource.getNumQueryMasterTasks());
+
+          builder.addWorkerList(workerBuilder.build());
+        }
+
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
       }
+    }
 
-      return builder.build();
+    @Override
+    public BoolProto createDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        if (context.getGlobalEngine().createDatabase(session, request.getValue(), null, false)) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
     }
 
     @Override
-    public BoolProto existTable(RpcController controller,
-                                StringProto tableNameProto)
-        throws ServiceException {
-      String tableName = tableNameProto.getValue();
-      if (catalog.existsTable(tableName)) {
-        return BOOL_TRUE;
-      } else {
-        return BOOL_FALSE;
+    public BoolProto existDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        if (catalog.existDatabase(request.getValue())) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public BoolProto dropDatabase(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        if (context.getGlobalEngine().dropDatabase(session, request.getValue(), false)) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public PrimitiveProtos.StringListProto getAllDatabases(RpcController controller, TajoIdProtos.SessionIdProto
+        request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getId());
+        return ProtoUtil.convertStrings(catalog.getAllDatabaseNames());
+      } catch (Throwable e) {
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public BoolProto existTable(RpcController controller, SessionedStringProto request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String databaseName;
+        String tableName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getValue();
+        }
+
+        if (catalog.existsTable(databaseName, tableName)) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (Throwable e) {
+        throw new ServiceException(e);
       }
     }
 
     @Override
     public GetTableListResponse getTableList(RpcController controller,
-                                             GetTableListRequest request)
-        throws ServiceException {
-      Collection<String> tableNames = catalog.getAllTableNames();
-      GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
-      builder.addAllTables(tableNames);
-      return builder.build();
+                                             GetTableListRequest request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        String databaseName;
+        if (request.hasDatabaseName()) {
+          databaseName = request.getDatabaseName();
+        } else {
+          databaseName = session.getCurrentDatabase();
+        }
+        Collection<String> tableNames = catalog.getAllTableNames(databaseName);
+        GetTableListResponse.Builder builder = GetTableListResponse.newBuilder();
+        builder.addAllTables(tableNames);
+        return builder.build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
     }
 
     @Override
-    public TableResponse getTableDesc(RpcController controller,
-                                      GetTableDescRequest request)
-        throws ServiceException {
-      String name = request.getTableName();
-      if (catalog.existsTable(name)) {
-        return TableResponse.newBuilder()
-            .setResultCode(ResultCode.OK)
-            .setTableDesc(catalog.getTableDesc(name).getProto())
-            .build();
-      } else {
-        return TableResponse.newBuilder()
-            .setResultCode(ResultCode.ERROR)
-            .setErrorMessage("ERROR: no such a table: " + request.getTableName())
-            .build();
+    public TableResponse getTableDesc(RpcController controller, GetTableDescRequest request) throws ServiceException {
+      try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String databaseName;
+        String tableName;
+        if (CatalogUtil.isFQTableName(request.getTableName())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getTableName();
+        }
+
+        if (catalog.existsTable(databaseName, tableName)) {
+          return TableResponse.newBuilder()
+              .setResultCode(ResultCode.OK)
+              .setTableDesc(catalog.getTableDesc(databaseName, tableName).getProto())
+              .build();
+        } else {
+          return TableResponse.newBuilder()
+              .setResultCode(ResultCode.ERROR)
+              .setErrorMessage("ERROR: no such a table: " + request.getTableName())
+              .build();
+        }
+      } catch (Throwable t) {
+        throw new ServiceException(t);
       }
     }
 
@@ -431,6 +677,8 @@ public class TajoMasterClientService extends AbstractService {
     public TableResponse createExternalTable(RpcController controller, CreateTableRequest request)
         throws ServiceException {
       try {
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
         Path path = new Path(request.getPath());
         FileSystem fs = path.getFileSystem(conf);
 
@@ -447,8 +695,8 @@ public class TajoMasterClientService extends AbstractService {
 
         TableDesc desc;
         try {
-          desc = context.getGlobalEngine().createTableOnPath(request.getName(), schema,
-              meta, path, false, partitionDesc);
+          desc = context.getGlobalEngine().createTableOnPath(session, request.getName(), schema,
+              meta, path, true, partitionDesc, false);
         } catch (Exception e) {
           return TableResponse.newBuilder()
               .setResultCode(ResultCode.ERROR)
@@ -458,6 +706,10 @@ public class TajoMasterClientService extends AbstractService {
         return TableResponse.newBuilder()
             .setResultCode(ResultCode.OK)
             .setTableDesc(desc.getProto()).build();
+      } catch (InvalidSessionException ise) {
+        return TableResponse.newBuilder()
+            .setResultCode(ResultCode.ERROR)
+            .setErrorMessage(ise.getMessage()).build();
       } catch (IOException ioe) {
         return TableResponse.newBuilder()
             .setResultCode(ResultCode.ERROR)
@@ -467,30 +719,43 @@ public class TajoMasterClientService extends AbstractService {
 
     @Override
     public BoolProto dropTable(RpcController controller, DropTableRequest dropTable) throws ServiceException {
-      context.getGlobalEngine().dropTable(dropTable.getName(), dropTable.getPurge());
-      return BOOL_TRUE;
+      try {
+        Session session = context.getSessionManager().getSession(dropTable.getSessionId().getId());
+        context.getGlobalEngine().dropTable(session, dropTable.getName(), false, dropTable.getPurge());
+        return BOOL_TRUE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
+      }
     }
 
     @Override
-    public FunctionResponse getFunctionList(RpcController controller, StringProto request) throws ServiceException {
-      String functionName = request.getValue();
-      Collection<FunctionDesc> functions = catalog.getFunctions();
+    public FunctionResponse getFunctionList(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
 
-      List<CatalogProtos.FunctionDescProto> functionProtos = new ArrayList<CatalogProtos.FunctionDescProto>();
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
 
-      for (FunctionDesc eachFunction: functions) {
-        if (functionName == null || functionName.isEmpty()) {
-          functionProtos.add(eachFunction.getProto());
-        } else {
-          if(functionName.equals(eachFunction.getSignature())) {
+        String functionName = request.getValue();
+        Collection<FunctionDesc> functions = catalog.getFunctions();
+
+        List<CatalogProtos.FunctionDescProto> functionProtos = new ArrayList<CatalogProtos.FunctionDescProto>();
+
+        for (FunctionDesc eachFunction: functions) {
+          if (functionName == null || functionName.isEmpty()) {
             functionProtos.add(eachFunction.getProto());
+          } else {
+            if(functionName.equals(eachFunction.getSignature())) {
+              functionProtos.add(eachFunction.getProto());
+            }
           }
         }
+        return FunctionResponse.newBuilder()
+            .setResultCode(ResultCode.OK)
+            .addAllFunctions(functionProtos)
+            .build();
+      } catch (Throwable t) {
+        throw new ServiceException(t);
       }
-      return FunctionResponse.newBuilder()
-          .setResultCode(ResultCode.OK)
-          .addAllFunctions(functionProtos)
-          .build();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
index 28b5f08..8b18b5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -185,7 +185,7 @@ public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskR
 //
 //    @Override
 //    protected String getRunnerClass() {
-//      return TaskRunner.class.getCanonicalName();
+//      return TaskRunner.class.getCanonicalSignature();
 //    }
 //
 //    @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
index 2c8b822..dd996e6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -21,6 +21,7 @@ package org.apache.tajo.master.event;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.session.Session;
 
 /**
  * This event is conveyed to QueryMaster.
@@ -30,14 +31,17 @@ public class QueryStartEvent extends AbstractEvent {
     QUERY_START
   }
 
-  private QueryId queryId;
-  private QueryContext queryContext;
-  private String sql;
-  private String logicalPlanJson;
+  private final QueryId queryId;
+  private final Session session;
+  private final QueryContext queryContext;
+  private final String sql;
+  private final String logicalPlanJson;
 
-  public QueryStartEvent(QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
+  public QueryStartEvent(QueryId queryId, Session session, QueryContext queryContext, String sql,
+                         String logicalPlanJson) {
     super(EventType.QUERY_START);
     this.queryId = queryId;
+    this.session = session;
     this.queryContext = queryContext;
     this.sql = sql;
     this.logicalPlanJson = logicalPlanJson;
@@ -47,6 +51,10 @@ public class QueryStartEvent extends AbstractEvent {
     return queryId;
   }
 
+  public Session getSession() {
+    return this.session;
+  }
+
   public QueryContext getQueryContext() {
     return this.queryContext;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
index 08fff53..7c3d283 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/metrics/CatalogMetricsGaugeSet.java
@@ -26,6 +26,8 @@ import org.apache.tajo.master.TajoMaster;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+
 public class CatalogMetricsGaugeSet implements MetricSet {
   TajoMaster.MasterContext tajoMasterContext;
   public CatalogMetricsGaugeSet(TajoMaster.MasterContext tajoMasterContext) {
@@ -38,7 +40,7 @@ public class CatalogMetricsGaugeSet implements MetricSet {
     metricsMap.put("numTables", new Gauge<Integer>() {
       @Override
       public Integer getValue() {
-        return tajoMasterContext.getCatalog().getAllTableNames().size();
+        return tajoMasterContext.getCatalog().getAllTableNames(DEFAULT_DATABASE_NAME).size();
       }
     });
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 02ed34e..8467b4b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -37,12 +37,12 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
@@ -134,6 +134,20 @@ public class Query implements EventHandler<QueryEvent> {
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
+          // Transitions from QUERY_SUCCEEDED state
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          // ignore-able transitions
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.KILL)
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
           // Transitions from KILL_WAIT state
           .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
               QueryEventType.SUBQUERY_COMPLETED,
@@ -433,8 +447,8 @@ public class Query implements EventHandler<QueryEvent> {
       }
 
       @Override
-      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
-                          ExecutionBlockId finalExecBlockId,
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId,
                           Path finalOutputDir) throws Exception {
         SubQuery lastStage = query.getSubQuery(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
@@ -446,6 +460,7 @@ public class Query implements EventHandler<QueryEvent> {
                 lastStage.getSchema(),
                 meta,
                 finalOutputDir);
+        resultTableDesc.setExternal(true);
 
         stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
         resultTableDesc.setStats(stats);
@@ -463,9 +478,8 @@ public class Query implements EventHandler<QueryEvent> {
       }
 
       @Override
-      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
-                          ExecutionBlockId finalExecBlockId,
-                          Path finalOutputDir) throws Exception {
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
         CatalogService catalog = context.getWorkerContext().getCatalog();
         SubQuery lastStage = query.getSubQuery(finalExecBlockId);
         TableMeta meta = lastStage.getTableMeta();
@@ -479,6 +493,7 @@ public class Query implements EventHandler<QueryEvent> {
                 createTableNode.getTableSchema(),
                 meta,
                 finalOutputDir);
+        tableDescTobeCreated.setExternal(createTableNode.isExternal());
 
         if (createTableNode.hasPartition()) {
           tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
@@ -488,7 +503,7 @@ public class Query implements EventHandler<QueryEvent> {
         tableDescTobeCreated.setStats(stats);
         query.setResultDesc(tableDescTobeCreated);
 
-        catalog.addTable(tableDescTobeCreated);
+        catalog.createTable(tableDescTobeCreated);
       }
     }
 
@@ -502,9 +517,8 @@ public class Query implements EventHandler<QueryEvent> {
       }
 
       @Override
-      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
-                          ExecutionBlockId finalExecBlockId,
-                          Path finalOutputDir)
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
           throws Exception {
 
         CatalogService catalog = context.getWorkerContext().getCatalog();
@@ -528,8 +542,8 @@ public class Query implements EventHandler<QueryEvent> {
         finalTable.setStats(stats);
 
         if (insertNode.hasTargetTable()) {
-          catalog.deleteTable(insertNode.getTableName());
-          catalog.addTable(finalTable);
+          catalog.dropTable(insertNode.getTableName());
+          catalog.createTable(finalTable);
         }
 
         query.setResultDesc(finalTable);

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index efb9c06..b8f312e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -37,6 +37,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -52,6 +53,8 @@ public class QueryInProgress extends CompositeService {
 
   private QueryId queryId;
 
+  private Session session;
+
   private QueryContext queryContext;
 
   private TajoAsyncDispatcher dispatcher;
@@ -74,10 +77,12 @@ public class QueryInProgress extends CompositeService {
 
   public QueryInProgress(
       TajoMaster.MasterContext masterContext,
+      Session session,
       QueryContext queryContext,
       QueryId queryId, String sql, LogicalRootNode plan) {
     super(QueryInProgress.class.getName());
     this.masterContext = masterContext;
+    this.session = session;
     this.queryContext = queryContext;
     this.queryId = queryId;
     this.plan = plan;
@@ -229,6 +234,7 @@ public class QueryInProgress extends CompositeService {
           null,
           TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
               .setQueryId(queryId.getProto())
+              .setSession(session.getProto())
               .setQueryContext(queryContext.getProto())
               .setSql(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getSql()))
               .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index 6cfb600..aa03501 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -30,6 +30,7 @@ import org.apache.tajo.engine.planner.logical.LogicalRootNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.session.Session;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -94,9 +95,10 @@ public class QueryJobManager extends CompositeService {
     return Collections.unmodifiableCollection(finishedQueries.values());
   }
 
-  public QueryInfo createNewQueryJob(QueryContext queryContext, String sql, LogicalRootNode plan) throws Exception {
+  public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql, LogicalRootNode plan)
+      throws Exception {
     QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryContext, queryId, sql, plan);
+    QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, plan);
 
     synchronized(runningQueries) {
       runningQueries.put(queryId, queryInProgress);

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 9dddbf7..abdc214 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -371,7 +371,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     public void handle(QueryStartEvent event) {
       LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
       QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
-          event.getQueryId(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
+          event.getQueryId(), event.getSession(), event.getQueryContext(), event.getSql(), event.getLogicalPlanJson());
 
       queryMasterTask.init(systemConf);
       queryMasterTask.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index e3e0260..bf59e9f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -33,6 +33,7 @@ import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.LazyTaskScheduler;
 import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
@@ -225,6 +226,7 @@ public class QueryMasterManagerService extends CompositeService
       QueryId queryId = new QueryId(request.getQueryId());
       LOG.info("Receive executeQuery request:" + queryId);
       queryMaster.handle(new QueryStartEvent(queryId,
+          new Session(request.getSession()),
           new QueryContext(request.getQueryContext()), request.getSql().getValue(),
           request.getLogicalPlanJson().getValue()));
       done.run(TajoWorker.TRUE_PROTO);

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 7719a85..39e2177 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -48,6 +48,7 @@ import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
@@ -76,6 +77,8 @@ public class QueryMasterTask extends CompositeService {
 
   private QueryId queryId;
 
+  private Session session;
+
   private QueryContext queryContext;
 
   private QueryMasterTaskContext queryTaskContext;
@@ -107,10 +110,13 @@ public class QueryMasterTask extends CompositeService {
   private TajoMetrics queryMetrics;
 
   public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
-                         QueryId queryId, QueryContext queryContext, String sql, String logicalPlanJson) {
+                         QueryId queryId, Session session, QueryContext queryContext, String sql,
+                         String logicalPlanJson) {
+
     super(QueryMasterTask.class.getName());
     this.queryMasterContext = queryMasterContext;
     this.queryId = queryId;
+    this.session = session;
     this.queryContext = queryContext;
     this.sql = sql;
     this.logicalPlanJson = logicalPlanJson;
@@ -306,7 +312,7 @@ public class QueryMasterTask extends CompositeService {
     }
     LogicalPlan plan = null;
     try {
-      plan = planner.createPlan(expr);
+      plan = planner.createPlan(session, expr);
       optimizer.optimize(plan);
     } catch (PlanningException e) {
       e.printStackTrace();
@@ -400,7 +406,7 @@ public class QueryMasterTask extends CompositeService {
 
       // Create a subdirectories
       defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
-      LOG.info("The staging dir '" + outputDir + "' is created.");
+      LOG.info("The staging dir '" + stagingDir + "' is created.");
       queryContext.setStagingDir(stagingDir);
 
       /////////////////////////////////////////////////
@@ -481,6 +487,10 @@ public class QueryMasterTask extends CompositeService {
       return queryMasterContext;
     }
 
+    public Session getSession() {
+      return session;
+    }
+
     public QueryContext getQueryContext() {
       return queryContext;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 1bcf38b..4bd7adb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -30,7 +30,7 @@ import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.ProtoBufUtil;
+import org.apache.tajo.util.ProtoUtil;
 
 import java.io.IOError;
 import java.net.InetSocketAddress;
@@ -109,7 +109,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   }
 
   /** The response builder */
-  private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoBufUtil.TRUE);
+  private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE);
 
   private static WorkerStatusEvent createStatusEvent(String workerKey, NodeHeartbeat heartbeat) {
     return new WorkerStatusEvent(

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
new file mode 100644
index 0000000..3f48ca5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/InvalidSessionException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class InvalidSessionException extends Exception {
+  public InvalidSessionException(String sessionId) {
+    super("Invalid session id \"" + sessionId + "\"");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..686d860
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/NoSuchSessionVariableException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public class NoSuchSessionVariableException extends Exception {
+  public NoSuchSessionVariableException(String varname) {
+    super("No such session variable \"" + varname + "\"");
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java
new file mode 100644
index 0000000..4d244bf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/Session.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.common.ProtoObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
+
+public class Session implements SessionConstants, ProtoObject<SessionProto> {
+  private final String sessionId;
+  private final String userName;
+  private final Map<String, String> sessionVariables;
+
+  // transient status
+  private volatile long lastAccessTime;
+  private volatile String currentDatabase;
+
+  public Session(String sessionId, String userName, String databaseName) {
+    this.sessionId = sessionId;
+    this.userName = userName;
+    this.lastAccessTime = System.currentTimeMillis();
+    this.sessionVariables = new HashMap<String, String>();
+    selectDatabase(databaseName);
+  }
+
+  public Session(SessionProto proto) {
+    sessionId = proto.getSessionId();
+    userName = proto.getUsername();
+    currentDatabase = proto.getCurrentDatabase();
+    lastAccessTime = proto.getLastAccessTime();
+    Options options = new Options(proto.getVariables());
+    sessionVariables = options.getAllKeyValus();
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public void updateLastAccessTime() {
+    lastAccessTime = System.currentTimeMillis();
+  }
+
+  public long getLastAccessTime() {
+    return lastAccessTime;
+  }
+
+  public void setVariable(String name, String value) {
+    synchronized (sessionVariables) {
+      sessionVariables.put(name, value);
+    }
+  }
+
+  public String getVariable(String name) throws NoSuchSessionVariableException {
+    synchronized (sessionVariables) {
+      if (sessionVariables.containsKey(name)) {
+        return sessionVariables.get(name);
+      } else {
+        throw new NoSuchSessionVariableException(name);
+      }
+    }
+  }
+
+  public void removeVariable(String name) {
+    synchronized (sessionVariables) {
+      sessionVariables.remove(name);
+    }
+  }
+
+  public synchronized Map<String, String> getAllVariables() {
+    synchronized (sessionVariables) {
+      return ImmutableMap.copyOf(sessionVariables);
+    }
+  }
+
+  public void selectDatabase(String databaseName) {
+    this.currentDatabase = databaseName;
+  }
+
+  public String getCurrentDatabase() {
+    return this.currentDatabase;
+  }
+
+  @Override
+  public SessionProto getProto() {
+    SessionProto.Builder builder = SessionProto.newBuilder();
+    builder.setSessionId(sessionId);
+    builder.setUsername(userName);
+    builder.setCurrentDatabase(currentDatabase);
+    builder.setLastAccessTime(lastAccessTime);
+    Options variables = new Options();
+    variables.putAll(this.sessionVariables);
+    builder.setVariables(variables.getProto());
+    return builder.build();
+  }
+
+  public String toString() {
+    return "user=" + userName + ",id=" + sessionId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java
new file mode 100644
index 0000000..46f49a2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionConstants.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public interface SessionConstants {
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java
new file mode 100644
index 0000000..dce3ba6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEvent.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class SessionEvent extends AbstractEvent<SessionEventType> {
+  private final String sessionId;
+
+  public SessionEvent(String sessionId, SessionEventType sessionEventType) {
+    super(sessionEventType);
+    this.sessionId = sessionId;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java
new file mode 100644
index 0000000..64c6fc6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionEventType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+public enum SessionEventType {
+  EXPIRE,
+  PING
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
new file mode 100644
index 0000000..483920f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionLivelinessMonitor.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tajo.conf.TajoConf;
+
+public class SessionLivelinessMonitor extends AbstractLivelinessMonitor<String> {
+
+  private EventHandler dispatcher;
+
+  public SessionLivelinessMonitor(Dispatcher d) {
+    super(SessionLivelinessMonitor.class.getSimpleName(), new SystemClock());
+    this.dispatcher = d.getEventHandler();
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    Preconditions.checkArgument(conf instanceof TajoConf);
+    TajoConf systemConf = (TajoConf) conf;
+
+    // seconds
+    int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.CLIENT_SESSION_EXPIRY_TIME);
+    setExpireInterval(expireIntvl);
+    setMonitorInterval(expireIntvl / 3);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void expire(String id) {
+    dispatcher.handle(new SessionEvent(id, SessionEventType.EXPIRE));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java
new file mode 100644
index 0000000..24df9d8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/session/SessionManager.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.session;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SessionManager extends CompositeService implements EventHandler<SessionEvent> {
+  private static final Log LOG = LogFactory.getLog(SessionManager.class);
+
+  public final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
+  private final Dispatcher dispatcher;
+  private SessionLivelinessMonitor sessionLivelinessMonitor;
+
+
+  public SessionManager(Dispatcher dispatcher) {
+    super(SessionManager.class.getSimpleName());
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    sessionLivelinessMonitor = new SessionLivelinessMonitor(dispatcher);
+    addIfService(sessionLivelinessMonitor);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  private void assertSessionExistence(String sessionId) throws InvalidSessionException {
+    if (!sessions.containsKey(sessionId)) {
+      throw new InvalidSessionException(sessionId);
+    }
+  }
+
+  public String createSession(String username, String baseDatabaseName) throws InvalidSessionException {
+    String sessionId;
+    Session oldSession;
+
+    sessionId = UUID.randomUUID().toString();
+    Session newSession = new Session(sessionId, username, baseDatabaseName);
+    oldSession = sessions.putIfAbsent(sessionId, newSession);
+    if (oldSession != null) {
+      throw new InvalidSessionException("Session id is duplicated: " + oldSession.getSessionId());
+    }
+    LOG.info("Session " + sessionId + " is created." );
+    return sessionId;
+  }
+
+  public void removeSession(String sessionId) {
+    if (sessions.containsKey(sessionId)) {
+      sessions.remove(sessionId);
+      LOG.info("Session " + sessionId + " is removed.");
+    } else {
+      LOG.error("No such session id: " + sessionId);
+    }
+  }
+
+  public Session getSession(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId);
+  }
+
+  public void setVariable(String sessionId, String name, String value) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    sessions.get(sessionId).setVariable(name, value);
+  }
+
+  public String getVariable(String sessionId, String name)
+      throws InvalidSessionException, NoSuchSessionVariableException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId).getVariable(name);
+  }
+
+  public void removeVariable(String sessionId, String name) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    sessions.get(sessionId).removeVariable(name);
+  }
+
+  public Map<String, String> getAllVariables(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    touch(sessionId);
+    return sessions.get(sessionId).getAllVariables();
+  }
+
+  public void touch(String sessionId) throws InvalidSessionException {
+    assertSessionExistence(sessionId);
+    sessions.get(sessionId).updateLastAccessTime();
+    sessionLivelinessMonitor.receivedPing(sessionId);
+  }
+
+  @Override
+  public void handle(SessionEvent event) {
+    LOG.info("Processing " + event.getSessionId() + " of type " + event.getType());
+
+    try {
+      assertSessionExistence(event.getSessionId());
+      touch(event.getSessionId());
+    } catch (InvalidSessionException e) {
+      LOG.error(e);
+    }
+
+    if (event.getType() == SessionEventType.EXPIRE) {
+      Session session = sessions.remove(event.getSessionId());
+      LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 10e8ec2..7c94e33 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -28,6 +28,14 @@ import "TajoIdProtos.proto";
 import "CatalogProtos.proto";
 import "PrimitiveProtos.proto";
 
+message SessionProto {
+  required string session_id = 1;
+  required string username = 2;
+  required string current_database = 3;
+  required int64 last_access_time = 4;
+  required KeyValueSetProto variables = 5;
+}
+
 message TaskStatusProto {
   required QueryUnitAttemptIdProto id = 1;
   required string workerName = 2;
@@ -109,9 +117,10 @@ message ShuffleFileOutput {
 
 message QueryExecutionRequestProto {
     required QueryIdProto queryId = 1;
-    required KeyValueSetProto queryContext = 2;
-    required StringProto sql = 3;
-    optional StringProto logicalPlanJson = 4;
+    required SessionProto session = 2;
+    required KeyValueSetProto queryContext = 3;
+    required StringProto sql = 4;
+    optional StringProto logicalPlanJson = 5;
 }
 
 message GetTaskRequestProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
index 397146a..29fd05c 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/catalogview.jsp
@@ -49,13 +49,13 @@
   TableDesc tableDesc = null;
   String selectedTable = request.getParameter("table");
   if(selectedTable != null && !selectedTable.trim().isEmpty()) {
-    tableDesc = catalog.getTableDesc(selectedTable);
+    tableDesc = catalog.getTableDesc(selectedDatabase, selectedTable);
   } else {
     selectedTable = "";
   }
 
   //TODO filter with database
-  Collection<String> tableNames = catalog.getAllTableNames();
+  Collection<String> tableNames = catalog.getAllTableNames(selectedDatabase);
 %>
 
 <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
@@ -83,8 +83,16 @@
       <td width="20%" valign="top">
         <div>
           <b>Database:</b>
-          <select width="190" style="width: 190px">
-            <option value="default" selected>default</option>
+          <select width="190" style="width: 190px" onchange="document.location.href='catalogview.jsp?database=' + this.value">
+            <%
+              for (String databaseName : catalog.getAllDatabaseNames()) {
+                if (selectedDatabase.equals(databaseName)) { %>
+                  <option value="<%=databaseName%>" selected><%=databaseName%>
+                <%} else {%>
+                <option value="<%=databaseName%>"><%=databaseName%></option>
+                <%}
+              }
+            %>
           </select>
         </div>
         <!-- table list -->

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 144ca1b..e651313 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -30,10 +31,12 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.master.session.Session;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.sql.ResultSet;
+import java.util.UUID;
 
 public class LocalTajoTestingUtility {
   private static final Log LOG = LogFactory.getLog(LocalTajoTestingUtility.class);
@@ -42,6 +45,16 @@ public class LocalTajoTestingUtility {
   private TajoConf conf;
   private TajoClient client;
 
+  private static UserGroupInformation dummyUserInfo;
+
+  static {
+    try {
+      dummyUserInfo = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
   private static int taskAttemptId;
 
   public static QueryUnitAttemptId newQueryUnitAttemptId() {
@@ -51,6 +64,9 @@ public class LocalTajoTestingUtility {
   public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
     return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);
   }
+  public static Session createDummySession() {
+    return new Session(UUID.randomUUID().toString(), dummyUserInfo.getUserName(), TajoConstants.DEFAULT_DATABASE_NAME);
+  }
 
   /**
    * for test
@@ -88,9 +104,11 @@ public class LocalTajoTestingUtility {
       // It gives more various situations to unit tests.
       TableStats stats = new TableStats();
       stats.setNumBytes(TPCH.tableVolumes.get(names[i]));
-      TableDesc tableDesc = new TableDesc(names[i], schemas[i], meta, tablePath);
+      TableDesc tableDesc = new TableDesc(
+          CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, names[i]), schemas[i], meta,
+          tablePath);
       tableDesc.setStats(stats);
-      util.getMaster().getCatalog().addTable(tableDesc);
+      util.getMaster().getCatalog().createTable(tableDesc);
     }
 
     LOG.info("===================================================");