You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:16 UTC

[7/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
index 7ae01d6..044f0ae 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -21,417 +21,52 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryConf;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.querymaster.QueryMaster;
-import org.apache.tajo.pullserver.PullServerAuxService;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
 
 public abstract class ContainerProxy {
-  private static final Log LOG = LogFactory.getLog(ContainerProxy.class);
+  protected static final Log LOG = LogFactory.getLog(ContainerProxy.class);
 
   final public static FsPermission QUERYCONF_FILE_PERMISSION =
           FsPermission.createImmutable((short) 0644); // rw-r--r--
 
-  private final static RecordFactory recordFactory =
-          RecordFactoryProvider.getRecordFactory(null);
 
-  private static enum ContainerState {
+  protected static enum ContainerState {
     PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
   }
 
-  private final YarnRPC yarnRPC;
-  private Configuration conf;
-  private QueryMaster.QueryContext context;
+  protected final ExecutionBlockId executionBlockId;
+  protected Configuration conf;
+  protected QueryMasterTask.QueryContext context;
 
-  private ContainerState state;
+  protected ContainerState state;
   // store enough information to be able to cleanup the container
-  private Container container;
-  private ContainerId containerID;
-  final private String containerMgrAddress;
-  private ContainerToken containerToken;
-  private String hostName;
-  private int port = -1;
+  protected Container container;
+  protected ContainerId containerID;
+  protected String hostName;
+  protected int port = -1;
 
-  protected abstract void containerStarted();
-  protected abstract String getId();
-  protected abstract String getRunnerClass();
-  protected abstract Vector<CharSequence> getTaskParams();
+  public abstract void launch(ContainerLaunchContext containerLaunchContext);
+  public abstract void stopContainer();
 
-  public ContainerProxy(QueryMaster.QueryContext context, Configuration conf, YarnRPC yarnRPC, Container container) {
+  public ContainerProxy(QueryMasterTask.QueryContext context, Configuration conf,
+                        ExecutionBlockId executionBlockId, Container container) {
     this.context = context;
     this.conf = conf;
-    this.yarnRPC = yarnRPC;
     this.state = ContainerState.PREP;
     this.container = container;
+    this.executionBlockId = executionBlockId;
     this.containerID = container.getId();
-    NodeId nodeId = container.getNodeId();
-    this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
-    this.containerToken = container.getContainerToken();
-  }
-
-  protected ContainerManager getCMProxy(ContainerId containerID,
-                                        final String containerManagerBindAddr,
-                                        ContainerToken containerToken)
-          throws IOException {
-    String [] hosts = containerManagerBindAddr.split(":");
-    final InetSocketAddress cmAddr =
-            new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
-    UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Token<ContainerTokenIdentifier> token =
-              ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
-      // the user in createRemoteUser in this context has to be ContainerID
-      user = UserGroupInformation.createRemoteUser(containerID.toString());
-      user.addToken(token);
-    }
-
-    ContainerManager proxy = user.doAs(new PrivilegedAction<ContainerManager>() {
-      @Override
-      public ContainerManager run() {
-        return (ContainerManager) yarnRPC.getProxy(ContainerManager.class, cmAddr, conf);
-      }
-    });
-
-    return proxy;
   }
 
   public synchronized boolean isCompletelyDone() {
     return state == ContainerState.DONE || state == ContainerState.FAILED;
   }
 
-  @SuppressWarnings("unchecked")
-  public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
-    LOG.info("Launching Container with Id: " + containerID);
-    if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
-      state = ContainerState.DONE;
-      LOG.error("Container (" + containerID + " was killed before it was launched");
-      return;
-    }
-
-    ContainerManager proxy = null;
-    try {
-
-      proxy = getCMProxy(containerID, containerMgrAddress,
-              containerToken);
-
-      // Construct the actual Container
-      ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
-
-      // Now launch the actual container
-      StartContainerRequest startRequest = Records
-              .newRecord(StartContainerRequest.class);
-      startRequest.setContainerLaunchContext(containerLaunchContext);
-      StartContainerResponse response = proxy.startContainer(startRequest);
-
-      ByteBuffer portInfo = response
-              .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
-
-      if(portInfo != null) {
-        port = PullServerAuxService.deserializeMetaData(portInfo);
-      }
-
-      LOG.info("PullServer port returned by ContainerManager for "
-              + containerID + " : " + port);
-
-      if(port < 0) {
-        this.state = ContainerState.FAILED;
-        throw new IllegalStateException("Invalid shuffle port number "
-                + port + " returned for " + containerID);
-      }
-
-      containerStarted();
-
-      this.state = ContainerState.RUNNING;
-      this.hostName = containerMgrAddress.split(":")[0];
-      context.addContainer(containerID, this);
-    } catch (Throwable t) {
-      String message = "Container launch failed for " + containerID + " : "
-              + StringUtils.stringifyException(t);
-      this.state = ContainerState.FAILED;
-      LOG.error(message);
-    } finally {
-      if (proxy != null) {
-        yarnRPC.stopProxy(proxy, conf);
-      }
-    }
-  }
-
-  public synchronized void kill() {
-
-    if(isCompletelyDone()) {
-      return;
-    }
-    if(this.state == ContainerState.PREP) {
-      this.state = ContainerState.KILLED_BEFORE_LAUNCH;
-    } else {
-      LOG.info("KILLING " + containerID);
-
-      ContainerManager proxy = null;
-      try {
-        proxy = getCMProxy(this.containerID, this.containerMgrAddress,
-                this.containerToken);
-
-        // kill the remote container if already launched
-        StopContainerRequest stopRequest = Records
-                .newRecord(StopContainerRequest.class);
-        stopRequest.setContainerId(this.containerID);
-        proxy.stopContainer(stopRequest);
-        // If stopContainer returns without an error, assuming the stop made
-        // it over to the NodeManager.
-//          context.getEventHandler().handle(
-//              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
-        context.removeContainer(containerID);
-      } catch (Throwable t) {
-
-        // ignore the cleanup failure
-        String message = "cleanup failed for container "
-                + this.containerID + " : "
-                + StringUtils.stringifyException(t);
-        LOG.warn(message);
-        this.state = ContainerState.DONE;
-        return;
-      } finally {
-        if (proxy != null) {
-          yarnRPC.stopProxy(proxy, conf);
-        }
-      }
-      this.state = ContainerState.DONE;
-    }
-  }
-
-  public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config, String queryId, boolean isMaster) {
-    TajoConf conf = (TajoConf)config;
-
-    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-    try {
-      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the env variables to be setup
-    ////////////////////////////////////////////////////////////////////////////
-    LOG.info("Set the environment for the application master");
-
-    Map<String, String> environment = new HashMap<String, String>();
-    //String initialClassPath = getInitialClasspath(conf);
-    environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
-    if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
-      environment.put(ApplicationConstants.Environment.JAVA_HOME.name(),
-          System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
-    }
-
-    // TODO - to be improved with org.apache.tajo.sh shell script
-    Properties prop = System.getProperties();
-
-    if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
-            (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
-      LOG.info("tajo.test is TRUE");
-      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty("java.class.path", null));
-      environment.put("tajo.test", "TRUE");
-    } else {
-      // Add AppMaster.jar location to classpath
-      // At some point we should not be required to add
-      // the hadoop specific classpaths to the env.
-      // It should be provided out of the box.
-      // For now setting all required classpaths including
-      // the classpath to "." for the application jar
-      StringBuilder classPathEnv = new StringBuilder("./");
-      //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
-      for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
-        classPathEnv.append(':');
-        classPathEnv.append(c.trim());
-      }
-
-      classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
-      classPathEnv.append(":./log4j.properties:./*");
-      if(System.getenv("HADOOP_HOME") != null) {
-        environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
-        environment.put(ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(), System.getenv("HADOOP_HOME"));
-        environment.put(ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(), System.getenv("HADOOP_HOME"));
-        environment.put(ApplicationConstants.Environment.HADOOP_YARN_HOME.name(), System.getenv("HADOOP_HOME"));
-      }
-
-      if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
-        environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
-      }
-      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
-    }
-
-    ctx.setEnvironment(environment);
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    FileSystem fs = null;
-
-    LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
-
-    try {
-      fs = FileSystem.get(conf);
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-    FileContext fsCtx = null;
-    try {
-      fsCtx = FileContext.getFileContext(conf);
-    } catch (UnsupportedFileSystemException e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-
-    LOG.info("Writing a QueryConf to HDFS and add to local environment");
-    try {
-      // TODO move to tajo temp
-      Path warehousePath = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
-      Path queryConfPath = new Path(warehousePath, queryId);
-      if(isMaster) {
-        queryConfPath = new Path(queryConfPath, QueryConf.QUERY_MASTER_FILENAME);
-      } else {
-        queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
-      }
-
-      if(!fs.exists(queryConfPath)){
-        writeConf(conf, queryConfPath);
-      } else {
-        LOG.warn("QueryConf already exist. path: "  + queryConfPath.toString());
-      }
-      LocalResource queryConfSrc = createApplicationResource(fsCtx, queryConfPath, LocalResourceType.FILE);
-
-      localResources.put(queryConfPath.getName(), queryConfSrc);
-      ctx.setLocalResources(localResources);
-
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-    // TODO - move to sub-class
-    // Add shuffle token
-    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-    try {
-      serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID, PullServerAuxService.serializeMetaData(0));
-    } catch (IOException ioe) {
-      LOG.error(ioe);
-    }
-    ctx.setServiceData(serviceData);
-
-    return ctx;
-  }
-
-  private static LocalResource createApplicationResource(FileContext fs,
-                                                  Path p, LocalResourceType type)
-          throws IOException {
-    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
-    FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
-            .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
-    rsrc.setSize(rsrcStat.getLen());
-    rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    return rsrc;
-  }
-
-  private static void writeConf(Configuration conf, Path queryConfFile)
-          throws IOException {
-    // Write job file to Tajo's fs
-    FileSystem fs = queryConfFile.getFileSystem(conf);
-    FSDataOutputStream out =
-            FileSystem.create(fs, queryConfFile,
-                    new FsPermission(QUERYCONF_FILE_PERMISSION));
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
-  public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
-    // Setup environment by cloning from common env.
-    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
-    Map<String, String> myEnv = new HashMap<String, String>(env.size());
-    myEnv.putAll(env);
-
-    // Duplicate the ByteBuffers for access by multiple containers.
-    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-    for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
-      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the necessary command to execute the application master
-    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
-    // Set java executable command
-    //LOG.info("Setting up app master command");
-    vargs.add("${JAVA_HOME}" + "/bin/java");
-    // Set Xmx based on am memory size
-    vargs.add("-Xmx2000m");
-    // Set Remote Debugging
-    //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
-    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
-    //}
-    // Set class name
-    vargs.add(getRunnerClass());
-    vargs.add(getId()); // subqueryId
-    vargs.add(containerMgrAddress); // nodeId
-    vargs.add(containerID.toString()); // containerId
-    Vector<CharSequence> taskParams = getTaskParams();
-    if(taskParams != null) {
-      vargs.addAll(taskParams);
-    }
-
-    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-    // Get final commmand
-    StringBuilder command = new StringBuilder();
-    for (CharSequence str : vargs) {
-      command.append(str).append(" ");
-    }
-
-    LOG.info("Completed setting up TaskRunner command " + command.toString());
-    List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());
-
-    return BuilderUtils.newContainerLaunchContext(containerID, commonContainerLaunchContext.getUser(),
-            container.getResource(), commonContainerLaunchContext.getLocalResources(), myEnv, commands,
-            myServiceData, null, new HashMap<ApplicationAccessType, String>());
-  }
-
   public String getTaskHostName() {
     return this.hostName;
   }
@@ -439,4 +74,8 @@ public abstract class ContainerProxy {
   public int getTaskPort() {
     return this.port;
   }
+
+  public String getId() {
+    return executionBlockId.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
index 36327ff..a92ef75 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ExecutionBlock.java
@@ -15,7 +15,7 @@
 package org.apache.tajo.master;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.planner.logical.*;
 
@@ -40,7 +40,7 @@ public class ExecutionBlock {
     RANGE
   }
 
-  private SubQueryId subQueryId;
+  private ExecutionBlockId executionBlockId;
   private LogicalNode plan = null;
   private StoreTableNode store = null;
   private List<ScanNode> scanlist = new ArrayList<ScanNode>();
@@ -50,12 +50,12 @@ public class ExecutionBlock {
   private boolean hasJoinPlan;
   private boolean hasUnionPlan;
 
-  public ExecutionBlock(SubQueryId subQueryId) {
-    this.subQueryId = subQueryId;
+  public ExecutionBlock(ExecutionBlockId executionBlockId) {
+    this.executionBlockId = executionBlockId;
   }
 
-  public SubQueryId getId() {
-    return subQueryId;
+  public ExecutionBlockId getId() {
+    return executionBlockId;
   }
 
   public String getOutputName() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 8c3617e..ee4b98d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -23,16 +23,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoProtos;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.CatalogUtil;
@@ -48,18 +43,19 @@ import org.apache.tajo.engine.exception.UnknownWorkerException;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.GlobalOptimizer;
-import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
+import org.apache.tajo.engine.planner.logical.DropTableNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.querymaster.QueryMasterManager;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.EnumSet;
-import java.util.Set;
 
 @SuppressWarnings("unchecked")
 public class GlobalEngine extends AbstractService {
@@ -76,9 +72,6 @@ public class GlobalEngine extends AbstractService {
   private GlobalPlanner globalPlanner;
   private GlobalOptimizer globalOptimizer;
 
-  // Yarn
-  protected YarnClient yarnClient;
-
   public GlobalEngine(final MasterContext context)
       throws IOException {
     super(GlobalEngine.class.getName());
@@ -89,7 +82,6 @@ public class GlobalEngine extends AbstractService {
 
   public void start() {
     try  {
-      connectYarnClient();
       analyzer = new SQLAnalyzer();
       planner = new LogicalPlanner(context.getCatalog());
       optimizer = new LogicalOptimizer();
@@ -98,138 +90,60 @@ public class GlobalEngine extends AbstractService {
 
       globalOptimizer = new GlobalOptimizer();
     } catch (Throwable t) {
-      t.printStackTrace();
+      LOG.error(t.getMessage(), t);
     }
     super.start();
   }
 
   public void stop() {
     super.stop();
-    if (yarnClient != null) {
-      yarnClient.stop();
-    }
   }
 
-  public QueryId executeQuery(String tql)
+  public ClientProtos.GetQueryStatusResponse executeQuery(String sql)
       throws InterruptedException, IOException,
       NoSuchQueryIdException, IllegalQueryStatusException,
       UnknownWorkerException, EmptyClusterException {
 
-    long querySubmittionTime = context.getClock().getTime();
-    LOG.info("SQL: " + tql);
+    LOG.info("SQL: " + sql);
     // parse the query
-    Expr planningContext = analyzer.parse(tql);
+    Expr planningContext = analyzer.parse(sql);
     LogicalRootNode plan = (LogicalRootNode) createLogicalPlan(planningContext);
 
+    ClientProtos.GetQueryStatusResponse.Builder responseBuilder = ClientProtos.GetQueryStatusResponse.newBuilder();
+
     if (PlannerUtil.checkIfDDLPlan(plan)) {
       updateQuery(plan.getChild());
-      return TajoIdUtils.NullQueryId;
+
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      responseBuilder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
     } else {
-      GetNewApplicationResponse newApp = yarnClient.getNewApplication();
-      ApplicationId appId = newApp.getApplicationId();
-      QueryId queryId = TajoIdUtils.createQueryId(appId, 0);
-
-      LOG.info("Get AppId: " + appId + ", QueryId: " + queryId);
-      LOG.info("Setting up application submission context for ASM");
-
-      //request QueryMaster container
-      QueryConf queryConf = new QueryConf(context.getConf());
-      queryConf.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-      // the output table is given by user
-      if (plan.getChild().getType() == NodeType.CREATE_TABLE) {
-        CreateTableNode createTableNode = (CreateTableNode) plan.getChild();
-        queryConf.setOutputTable(createTableNode.getTableName());
+      QueryJobManager queryJobManager = context.getQueryJobManager();
+      QueryInfo queryInfo = null;
+      try {
+        queryInfo = queryJobManager.createNewQueryJob(sql, plan);
+      } catch (Exception e) {
+        responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+        responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
+        responseBuilder.setState(TajoProtos.QueryState.QUERY_ERROR);
+        responseBuilder.setErrorMessage(StringUtils.stringifyException(e));
+
+        return responseBuilder.build();
       }
-      QueryMasterManager queryMasterManager = new QueryMasterManager(context, yarnClient, queryId, tql, plan, appId,
-              context.getClock(), querySubmittionTime);
-      queryMasterManager.init(queryConf);
-      queryMasterManager.start();
-      context.addQuery(queryId, queryMasterManager);
-
-      return queryId;
-    }
-  }
-
-  private ApplicationAttemptId submitQuery() throws YarnRemoteException {
-    GetNewApplicationResponse newApp = getNewApplication();
-    ApplicationId appId = newApp.getApplicationId();
-    LOG.info("Get AppId: " + appId);
-    LOG.info("Setting up application submission context for ASM");
 
-    ApplicationSubmissionContext appContext = Records
-            .newRecord(ApplicationSubmissionContext.class);
+      //queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInfo));
 
-    // set the application id
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName("Tajo");
-
-    org.apache.hadoop.yarn.api.records.Priority
-            pri = Records.newRecord(org.apache.hadoop.yarn.api.records.Priority.class);
-    pri.setPriority(5);
-    appContext.setPriority(pri);
-
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue("default");
-
-    ContainerLaunchContext amContainer = Records
-            .newRecord(ContainerLaunchContext.class);
-    appContext.setAMContainerSpec(amContainer);
-
-    LOG.info("Submitting application to ASM");
-    yarnClient.submitApplication(appContext);
-
-    ApplicationReport appReport = monitorApplication(appId,
-            EnumSet.of(YarnApplicationState.ACCEPTED));
-    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-    LOG.info("Launching application with id: " + attemptId);
-
-    return attemptId;
-  }
+      responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      responseBuilder.setState(queryInfo.getQueryState());
+      if(queryInfo.getQueryMasterHost() != null) {
+        responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+      }
+      responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
+    }
 
-  private ApplicationAttemptId submitQueryOld() throws YarnRemoteException {
-    GetNewApplicationResponse newApp = getNewApplication();
-    // Get a new application id
-    ApplicationId appId = newApp.getApplicationId();
-    LOG.info("Get AppId: " + appId);
-    LOG.info("Setting up application submission context for ASM");
-    ApplicationSubmissionContext appContext = Records
-        .newRecord(ApplicationSubmissionContext.class);
-
-    // set the application id
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName("Tajo");
-
-    // Set the priority for the application master
-    org.apache.hadoop.yarn.api.records.Priority
-        pri = Records.newRecord(org.apache.hadoop.yarn.api.records.Priority.class);
-    pri.setPriority(5);
-    appContext.setPriority(pri);
-
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue("default");
-
-    // Set up the container launch context for the application master
-    ContainerLaunchContext amContainer = Records
-        .newRecord(ContainerLaunchContext.class);
-    appContext.setAMContainerSpec(amContainer);
-
-    // unmanaged AM
-    appContext.setUnmanagedAM(true);
-    LOG.info("Setting unmanaged AM");
-
-    // Submit the application to the applications manager
-    LOG.info("Submitting application to ASM");
-    yarnClient.submitApplication(appContext);
-
-    // Monitor the application to wait for launch state
-    ApplicationReport appReport = monitorApplication(appId,
-        EnumSet.of(YarnApplicationState.ACCEPTED));
-    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-    LOG.info("Launching application with id: " + attemptId);
-
-    return attemptId;
+    ClientProtos.GetQueryStatusResponse response = responseBuilder.build();
+    return response;
   }
 
   public QueryId updateQuery(String sql) throws IOException, SQLException {
@@ -242,7 +156,7 @@ public class GlobalEngine extends AbstractService {
       throw new SQLException("This is not update query:\n" + sql);
     } else {
       updateQuery(plan.getChild());
-      return TajoIdUtils.NullQueryId;
+      return QueryIdFactory.NULL_QUERY_ID;
     }
   }
 
@@ -272,24 +186,14 @@ public class GlobalEngine extends AbstractService {
     } catch (PlanningException e) {
       LOG.error(e.getMessage(), e);
     }
-    LOG.info("LogicalPlan:\n" + plan.getRootBlock().getRoot());
 
-    return optimizedPlan;
-  }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("LogicalPlan:\n" + plan.getRootBlock().getRoot());
+    }
 
-  private MasterPlan createGlobalPlan(QueryId id, LogicalRootNode rootNode)
-      throws IOException {
-    MasterPlan globalPlan = globalPlanner.build(id, rootNode);
-    return globalOptimizer.optimize(globalPlan);
+    return optimizedPlan;
   }
 
-//  private void startQuery(final QueryId queryId, final QueryConf queryConf,
-//                          final QueryMaster query) {
-//    context.getAllQueries().put(queryId, query);
-//    query.init(queryConf);
-//    query.start();
-//  }
-
   private TableDesc createTable(CreateTableNode createTable) throws IOException {
     TableMeta meta;
 
@@ -368,58 +272,4 @@ public class GlobalEngine extends AbstractService {
 
     LOG.info("Table \"" + tableName + "\" is dropped.");
   }
-
-  private void connectYarnClient() {
-    this.yarnClient = new YarnClientImpl();
-    this.yarnClient.init(context.getConf());
-    this.yarnClient.start();
-  }
-
-  public GetNewApplicationResponse getNewApplication()
-      throws YarnRemoteException {
-    return yarnClient.getNewApplication();
-  }
-
-  /**
-   * Monitor the submitted application for completion. Kill application if time
-   * expires.
-   *
-   * @param appId
-   *          Application Id of application to be monitored
-   * @return true if application completed successfully
-   * @throws YarnRemoteException
-   */
-  private ApplicationReport monitorApplication(ApplicationId appId,
-                                               Set<YarnApplicationState> finalState) throws YarnRemoteException {
-
-    while (true) {
-
-      // Check app status every 1 second.
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        LOG.debug("Thread sleep in monitoring loop interrupted");
-      }
-
-      // Get application report for the appId we are interested in
-      ApplicationReport report = yarnClient.getApplicationReport(appId);
-
-      LOG.info("Got application report from ASM for" + ", appId="
-          + appId.getId() + ", appAttemptId="
-          + report.getCurrentApplicationAttemptId() + ", clientToken="
-          + report.getClientToken() + ", appDiagnostics="
-          + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
-          + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
-          + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
-          + ", yarnAppState=" + report.getYarnApplicationState().toString()
-          + ", distributedFinalState="
-          + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
-          + report.getTrackingUrl() + ", appUser=" + report.getUser());
-
-      YarnApplicationState state = report.getYarnApplicationState();
-      if (finalState.contains(state)) {
-        return report;
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 8b6ba94..14c8bfd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -21,9 +21,9 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.SubQueryId;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
@@ -75,9 +75,9 @@ public class GlobalPlanner {
     // insert store at the subnode of the root
     UnaryNode root = rootNode;
     if (root.getChild().getType() != NodeType.STORE) {
-      SubQueryId subQueryId = QueryIdFactory.newSubQueryId(this.queryId);
-      outputTableName = subQueryId.toString();
-      insertStore(subQueryId.toString(),root).setLocal(false);
+      ExecutionBlockId executionBlockId = QueryIdFactory.newExecutionBlockId(this.queryId);
+      outputTableName = executionBlockId.toString();
+      insertStore(executionBlockId.toString(),root).setLocal(false);
     }
     
     // convert 2-phase plan
@@ -113,10 +113,10 @@ public class GlobalPlanner {
         if (groupby.getChild().getType() != NodeType.UNION &&
             groupby.getChild().getType() != NodeType.STORE &&
             groupby.getChild().getType() != NodeType.SCAN) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
           insertStore(tableId, groupby);
         }
-        tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+        tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
         // insert (a store for the first group by) and (a second group by)
         PlannerUtil.transformGroupbyTo2PWithStore((GroupbyNode)node, tableId);
       } else if (node.getType() == NodeType.SORT) {
@@ -126,10 +126,10 @@ public class GlobalPlanner {
         if (sort.getChild().getType() != NodeType.UNION &&
             sort.getChild().getType() != NodeType.STORE &&
             sort.getChild().getType() != NodeType.SCAN) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
           insertStore(tableId, sort);
         }
-        tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+        tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
         // insert (a store for the first sort) and (a second sort)
         PlannerUtil.transformSortTo2PWithStore((SortNode)node, tableId);
       } else if (node.getType() == NodeType.JOIN) {
@@ -138,8 +138,8 @@ public class GlobalPlanner {
         JoinNode join = (JoinNode) node;
 
         /*
-        if (join.getOuterNode().getType() == NodeType.SCAN &&
-            join.getInnerNode().getType() == NodeType.SCAN) {
+        if (join.getOuterNode().getType() == ExprType.SCAN &&
+            join.getInnerNode().getType() == ExprType.SCAN) {
           ScanNode outerScan = (ScanNode) join.getOuterNode();
           ScanNode innerScan = (ScanNode) join.getInnerNode();
 
@@ -198,14 +198,14 @@ public class GlobalPlanner {
         // insert stores for the first phase
         if (join.getLeftChild().getType() != NodeType.UNION &&
             join.getLeftChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
           store = new StoreTableNode(tableId);
           store.setLocal(true);
           PlannerUtil.insertOuterNode(node, store);
         }
         if (join.getRightChild().getType() != NodeType.UNION &&
             join.getRightChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
           store = new StoreTableNode(tableId);
           store.setLocal(true);
           PlannerUtil.insertInnerNode(node, store);
@@ -216,7 +216,7 @@ public class GlobalPlanner {
         // insert stores
         if (union.getLeftChild().getType() != NodeType.UNION &&
             union.getLeftChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
           store = new StoreTableNode(tableId);
           if(union.getLeftChild().getType() == NodeType.GROUP_BY) {
             /*This case is for cube by operator
@@ -230,7 +230,7 @@ public class GlobalPlanner {
         }
         if (union.getRightChild().getType() != NodeType.UNION &&
             union.getRightChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
           store = new StoreTableNode(tableId);
           if(union.getRightChild().getType() == NodeType.GROUP_BY) {
             /*This case is for cube by operator
@@ -246,7 +246,7 @@ public class GlobalPlanner {
         UnaryNode unary = (UnaryNode)node;
         if (unary.getType() != NodeType.STORE &&
             unary.getChild().getType() != NodeType.STORE) {
-          tableId = QueryIdFactory.newSubQueryId(queryId).toString();
+          tableId = QueryIdFactory.newExecutionBlockId(queryId).toString();
           insertStore(tableId, unary);
         }
       }
@@ -283,11 +283,11 @@ public class GlobalPlanner {
       
       if (node.getType() == NodeType.STORE) {
         store = (StoreTableNode) node;
-        SubQueryId id;
-        if (store.getTableName().startsWith(QueryId.PREFIX)) {
-          id = TajoIdUtils.newSubQueryId(store.getTableName());
+        ExecutionBlockId id;
+        if (store.getTableName().startsWith(ExecutionBlockId.EB_ID_PREFIX)) {
+          id = TajoIdUtils.createExecutionBlockId(store.getTableName());
         } else {
-          id = QueryIdFactory.newSubQueryId(queryId);
+          id = QueryIdFactory.newExecutionBlockId(queryId);
         }
         subQuery = new ExecutionBlock(id);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
new file mode 100644
index 0000000..8f83557
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TajoAsyncDispatcher extends AbstractService  implements Dispatcher {
+
+  private static final Log LOG = LogFactory.getLog(TajoAsyncDispatcher.class);
+
+  private final BlockingQueue<Event> eventQueue;
+  private volatile boolean stopped = false;
+
+  private Thread eventHandlingThread;
+  protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
+  private boolean exitOnDispatchException;
+
+  private String id;
+
+  public TajoAsyncDispatcher(String id) {
+    this(id, new LinkedBlockingQueue<Event>());
+  }
+
+  public TajoAsyncDispatcher(String id, BlockingQueue<Event> eventQueue) {
+    super(TajoAsyncDispatcher.class.getName());
+    this.id = id;
+    this.eventQueue = eventQueue;
+    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
+  }
+
+  Runnable createThread() {
+    return new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
+          Event event;
+          try {
+            event = eventQueue.take();
+            if(LOG.isDebugEnabled()) {
+              LOG.debug(id + ",event take:" + event.getType() + "," + event);
+            }
+          } catch(InterruptedException ie) {
+            if (!stopped) {
+              LOG.warn("AsyncDispatcher thread interrupted", ie);
+            }
+            return;
+          }
+          dispatch(event);
+        }
+      }
+    };
+  }
+
+  @Override
+  public synchronized void init(Configuration conf) {
+    this.exitOnDispatchException =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    //start all the components
+    super.start();
+    eventHandlingThread = new Thread(createThread());
+    eventHandlingThread.setName("AsyncDispatcher event handler");
+    eventHandlingThread.start();
+
+    LOG.info("AsyncDispatcher started:" + id);
+  }
+
+  @Override
+  public synchronized void stop() {
+    if(stopped) {
+      return;
+    }
+    stopped = true;
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+      try {
+        eventHandlingThread.join();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted Exception while stopping", ie);
+      }
+    }
+
+    // stop all the components
+    super.stop();
+
+    LOG.info("AsyncDispatcher stopped:" + id);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void dispatch(Event event) {
+    //all events go thru this loop
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+          + event.toString());
+    }
+//    LOG.info("====> Dispatching the event " + event.getClass().getName() + "."
+//        + event.toString() );
+    Class<? extends Enum> type = event.getType().getDeclaringClass();
+
+    try{
+      EventHandler handler = eventDispatchers.get(type);
+      if(handler != null) {
+        handler.handle(event);
+      } else {
+        throw new Exception("No handler for registered for " + type);
+      }
+    } catch (Throwable t) {
+      //TODO Maybe log the state of the queue
+      LOG.fatal("Error in dispatcher thread:" + event.getType(), t);
+      if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) {
+        LOG.info("Exiting, bye..");
+        System.exit(-1);
+      }
+    } finally {
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void register(Class<? extends Enum> eventType,
+                       EventHandler handler) {
+    /* check to see if we have a listener registered */
+    EventHandler<Event> registeredHandler = (EventHandler<Event>)
+        eventDispatchers.get(eventType);
+    LOG.debug("Registering " + eventType + " for " + handler.getClass());
+    if (registeredHandler == null) {
+      eventDispatchers.put(eventType, handler);
+    } else if (!(registeredHandler instanceof MultiListenerHandler)){
+      /* for multiple listeners of an event add the multiple listener handler */
+      MultiListenerHandler multiHandler = new MultiListenerHandler();
+      multiHandler.addHandler(registeredHandler);
+      multiHandler.addHandler(handler);
+      eventDispatchers.put(eventType, multiHandler);
+    } else {
+      /* already a multilistener, just add to it */
+      MultiListenerHandler multiHandler
+          = (MultiListenerHandler) registeredHandler;
+      multiHandler.addHandler(handler);
+    }
+  }
+
+  @Override
+  public EventHandler getEventHandler() {
+    return new GenericEventHandler();
+  }
+
+  class GenericEventHandler implements EventHandler<Event> {
+    public void handle(Event event) {
+      /* all this method does is enqueue all the events onto the queue */
+      int qSize = eventQueue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of event-queue is " + qSize);
+      }
+      int remCapacity = eventQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue: "
+            + remCapacity);
+      }
+      try {
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(id + ",add event:" +
+              event.getType() + "," + event + "," +
+              (eventHandlingThread == null ? "null" : eventHandlingThread.isAlive()));
+        }
+        eventQueue.put(event);
+      } catch (InterruptedException e) {
+        if (!stopped) {
+          LOG.warn("AsyncDispatcher thread interrupted", e);
+        }
+        throw new YarnException(e);
+      }
+    }
+  }
+
+  /**
+   * Multiplexing an event. Sending it to different handlers that
+   * are interested in the event.
+   */
+  static class MultiListenerHandler implements EventHandler<Event> {
+    List<EventHandler<Event>> listofHandlers;
+
+    public MultiListenerHandler() {
+      listofHandlers = new ArrayList<EventHandler<Event>>();
+    }
+
+    @Override
+    public void handle(Event event) {
+      for (EventHandler<Event> handler: listofHandlers) {
+        handler.handle(event);
+      }
+    }
+
+    void addHandler(EventHandler<Event> handler) {
+      listofHandlers.add(handler);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
new file mode 100644
index 0000000..5359311
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.event.QueryEvent;
+import org.apache.tajo.master.event.QueryEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TajoContainerProxy extends ContainerProxy {
+  public TajoContainerProxy(QueryMasterTask.QueryContext context,
+                            Configuration conf, Container container,
+                            ExecutionBlockId executionBlockId) {
+    super(context, conf, executionBlockId, container);
+  }
+
+  @Override
+  public void launch(ContainerLaunchContext containerLaunchContext) {
+    context.getResourceAllocator().addContainer(containerID, this);
+    this.hostName = container.getNodeId().getHost();
+    this.port = context.getQueryMasterContext().getWorkerContext().getPullService().getPort();
+    this.state = ContainerState.RUNNING;
+
+    assignExecutionBlock(executionBlockId, container);
+
+    context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+  }
+
+  private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
+    ProtoAsyncRpcClient tajoWorkerRpc = null;
+    try {
+      InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext()
+          .getTajoWorkerManagerService().getBindAddr();
+
+      InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
+      tajoWorkerRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+
+      TajoWorkerProtocol.RunExecutionBlockRequestProto request =
+          TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder()
+              .setExecutionBlockId(executionBlockId.toString())
+              .setQueryMasterHost(myAddr.getHostName())
+              .setQueryMasterPort(myAddr.getPort())
+              .setNodeId(container.getNodeId().toString())
+              .setContainerId(container.getId().toString())
+              .setQueryOutputPath(context.getOutputPath().toString())
+              .build();
+
+      tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());
+    } catch (Exception e) {
+      //TODO retry
+      LOG.error(e.getMessage(), e);
+    } finally {
+      if(tajoWorkerRpc != null) {
+        (new AyncRpcClose(tajoWorkerRpc)).start();
+      }
+    }
+  }
+
+  class AyncRpcClose extends Thread {
+    ProtoAsyncRpcClient client;
+    public AyncRpcClose(ProtoAsyncRpcClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public void run() {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+      client.close();
+    }
+  }
+
+  @Override
+  public synchronized void stopContainer() {
+    LOG.info("Release TajoWorker Resource: " + executionBlockId + "," + containerID + ", state:" + this.state);
+    if(isCompletelyDone()) {
+      LOG.info("====> Container already stopped:" + containerID);
+      return;
+    }
+    if(this.state == ContainerState.PREP) {
+      this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+    } else {
+      try {
+        releaseWorkerResource(context, executionBlockId, ((TajoWorkerContainer)container).getWorkerResource());
+        context.getResourceAllocator().removeContainer(containerID);
+        this.state = ContainerState.DONE;
+      } catch (Throwable t) {
+        // ignore the cleanup failure
+        String message = "cleanup failed for container "
+            + this.containerID + " : "
+            + StringUtils.stringifyException(t);
+        LOG.warn(message);
+        this.state = ContainerState.DONE;
+        return;
+      }
+    }
+  }
+
+  public static void releaseWorkerResource(QueryMasterTask.QueryContext context,
+                                           ExecutionBlockId executionBlockId,
+                                           WorkerResource workerResource) throws Exception {
+    List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
+    workerResources.add(workerResource);
+
+    releaseWorkerResource(context, executionBlockId, workerResources);
+  }
+
+  public static void releaseWorkerResource(QueryMasterTask.QueryContext context,
+                                           ExecutionBlockId executionBlockId,
+                                           List<WorkerResource> workerResources) throws Exception {
+    List<TajoMasterProtocol.WorkerResourceProto> workerResourceProtos =
+        new ArrayList<TajoMasterProtocol.WorkerResourceProto>();
+
+    for(WorkerResource eahWorkerResource: workerResources) {
+      workerResourceProtos.add(TajoMasterProtocol.WorkerResourceProto.newBuilder()
+          .setWorkerHostAndPort(eahWorkerResource.getId())
+          .setExecutionBlockId(executionBlockId.getProto())
+          .setMemoryMBSlots(eahWorkerResource.getMemoryMBSlots())
+          .setDiskSlots(eahWorkerResource.getDiskSlots())
+          .build()
+      );
+    }
+    context.getQueryMasterContext().getWorkerContext().getTajoMasterRpcClient()
+        .releaseWorkerResource(null,
+            TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder()
+                .addAllWorkerResources(workerResourceProtos)
+                .build(),
+            NullCallback.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index b84b51b..f22472a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.master;
 
-import com.google.common.collect.Maps;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,12 +31,9 @@ import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
@@ -47,16 +43,15 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.Country;
 import org.apache.tajo.engine.function.InCountry;
 import org.apache.tajo.engine.function.builtin.*;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.master.querymaster.QueryMasterManager;
-import org.apache.tajo.master.querymaster.QueryMasterManagerService;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.webapp.StaticHttpServer;
 
+import java.lang.reflect.Constructor;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 public class TajoMaster extends CompositeService {
 
@@ -79,12 +74,14 @@ public class TajoMaster extends CompositeService {
   private GlobalEngine globalEngine;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
-  private QueryMasterManagerService queryMasterManagerService;
-  private YarnRPC yarnRPC;
+  private TajoMasterService tajoMasterService;
 
+  private WorkerResourceManager resourceManager;
   //Web Server
   private StaticHttpServer webServer;
 
+  private QueryJobManager queryJobManager;
+
   public TajoMaster() throws Exception {
     super(TajoMaster.class.getName());
   }
@@ -96,16 +93,23 @@ public class TajoMaster extends CompositeService {
     context = new MasterContext(conf);
     clock = new SystemClock();
 
-
     try {
       RackResolver.init(conf);
 
+//      this.conf.writeXml(System.out);
+      String className = this.conf.get("tajo.resource.manager", TajoWorkerResourceManager.class.getCanonicalName());
+      Class<WorkerResourceManager> resourceManagerClass =
+          (Class<WorkerResourceManager>)Class.forName(className);
+
+      Constructor<WorkerResourceManager> constructor = resourceManagerClass.getConstructor(MasterContext.class);
+      resourceManager = constructor.newInstance(context);
+      resourceManager.init(context.getConf());
+
+      //TODO WebServer port configurable
       webServer = StaticHttpServer.getInstance(this ,"admin", null, 8080 ,
           true, null, context.getConf(), null);
       webServer.start();
 
-      QueryIdFactory.reset();
-
       // Get the tajo base dir
       this.basePath = new Path(conf.getVar(ConfVars.ROOT_DIR));
       LOG.info("Tajo Root dir is set " + basePath);
@@ -128,8 +132,6 @@ public class TajoMaster extends CompositeService {
         LOG.info("Warehouse dir (" + wareHousePath + ") is created");
       }
 
-      yarnRPC = YarnRPC.create(conf);
-
       this.dispatcher = new AsyncDispatcher();
       addIfService(dispatcher);
 
@@ -149,15 +151,19 @@ public class TajoMaster extends CompositeService {
       globalEngine = new GlobalEngine(context);
       addIfService(globalEngine);
 
+      queryJobManager = new QueryJobManager(context);
+      addIfService(queryJobManager);
+
       tajoMasterClientService = new TajoMasterClientService(context);
       addIfService(tajoMasterClientService);
 
-      queryMasterManagerService = new QueryMasterManagerService(context);
-      addIfService(queryMasterManagerService);
+      tajoMasterService = new TajoMasterService(context);
+      addIfService(tajoMasterService);
     } catch (Exception e) {
-       e.printStackTrace();
+       LOG.error(e.getMessage(), e);
     }
 
+    LOG.info("====> Tajo master started");
     super.init(conf);
   }
 
@@ -282,10 +288,6 @@ public class TajoMaster extends CompositeService {
       LOG.error(e);
     }
 
-    for(QueryMasterManager eachQuery: getContext().getAllQueries().values()) {
-      eachQuery.stop();
-    }
-
     super.stop();
     LOG.info("TajoMaster main thread exiting");
   }
@@ -310,40 +312,7 @@ public class TajoMaster extends CompositeService {
     return this.storeManager;
   }
 
-  // TODO - to be improved
-  public Collection<QueryMasterProtocol.TaskStatusProto> getProgressQueries() {
-    return null;
-  }
-
-//  private class QueryEventDispatcher implements EventHandler<QueryEvent> {
-//    @Override
-//    public void handle(QueryEvent queryEvent) {
-//      LOG.info("QueryEvent: " + queryEvent.getQueryId());
-//      LOG.info("Found: " + context.getQuery(queryEvent.getQueryId()).getContext().getQueryId());
-//      context.getQuery(queryEvent.getQueryId()).handle(queryEvent);
-//    }
-//  }
-
-  public static void main(String[] args) throws Exception {
-    StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
-
-    try {
-      TajoMaster master = new TajoMaster();
-      ShutdownHookManager.get().addShutdownHook(
-          new CompositeServiceShutdownHook(master),
-          SHUTDOWN_HOOK_PRIORITY);
-      TajoConf conf = new TajoConf(new YarnConfiguration());
-      master.init(conf);
-      master.start();
-    } catch (Throwable t) {
-      LOG.fatal("Error starting TajoMaster", t);
-      System.exit(-1);
-    }
-  }
-
   public class MasterContext {
-    //private final Map<QueryId, QueryMaster> queries = Maps.newConcurrentMap();
-    private final Map<QueryId, QueryMasterManager> queries = Maps.newConcurrentMap();
     private final TajoConf conf;
 
     public MasterContext(TajoConf conf) {
@@ -358,20 +327,12 @@ public class TajoMaster extends CompositeService {
       return clock;
     }
 
-    public QueryMasterManager getQuery(QueryId queryId) {
-      return queries.get(queryId);
+    public QueryJobManager getQueryJobManager() {
+      return queryJobManager;
     }
 
-    public Map<QueryId, QueryMasterManager> getAllQueries() {
-      return queries;
-    }
-
-    public void addQuery(QueryId queryId, QueryMasterManager queryMasterManager) {
-      queries.put(queryId, queryMasterManager);
-    }
-
-    public AsyncDispatcher getDispatcher() {
-      return dispatcher;
+    public WorkerResourceManager getResourceManager() {
+      return resourceManager;
     }
 
     public EventHandler getEventHandler() {
@@ -390,16 +351,23 @@ public class TajoMaster extends CompositeService {
       return storeManager;
     }
 
-    public YarnRPC getYarnRPC() {
-      return yarnRPC;
+    public TajoMasterService getTajoMasterService() {
+      return tajoMasterService;
     }
+  }
 
-    public TajoMasterClientService getClientService() {
-      return tajoMasterClientService;
-    }
+  public static void main(String[] args) throws Exception {
+    StringUtils.startupShutdownMessage(TajoMaster.class, args, LOG);
 
-    public QueryMasterManagerService getQueryMasterManagerService() {
-      return queryMasterManagerService;
+    try {
+      TajoMaster master = new TajoMaster();
+      ShutdownHookManager.get().addShutdownHook(new CompositeServiceShutdownHook(master), SHUTDOWN_HOOK_PRIORITY);
+      TajoConf conf = new TajoConf(new YarnConfiguration());
+      master.init(conf);
+      master.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting TajoMaster", t);
+      System.exit(-1);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index ed1376c..8578b64 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -26,9 +26,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAttemptIdProto;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
@@ -37,18 +39,18 @@ import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.query.exception.SQLSyntaxError;
+import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.querymaster.QueryMasterManager;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.rpc.ProtoBlockingRpcServer;
 import org.apache.tajo.rpc.RemoteException;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -88,9 +90,10 @@ public class TajoMasterClientService extends AbstractService {
       LOG.error(e);
     }
     server.start();
-    bindAddress = NetUtils.getConnectAddress(server.getListenAddress());
-    this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddress));
-    LOG.info("TajoMasterClientService startup");
+    bindAddress = server.getListenAddress();
+    this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS,
+        org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
+    LOG.info("Instantiated TajoMasterClientService at " + this.bindAddress);
     super.start();
   }
 
@@ -99,7 +102,6 @@ public class TajoMasterClientService extends AbstractService {
     if (server != null) {
       server.shutdown();
     }
-    LOG.info("TajoMasterClientService shutdown");
     super.stop();
   }
 
@@ -123,40 +125,26 @@ public class TajoMasterClientService extends AbstractService {
     }
 
     @Override
-    public SubmitQueryResponse submitQuery(RpcController controller,
+    public GetQueryStatusResponse submitQuery(RpcController controller,
                                            QueryRequest request)
         throws ServiceException {
 
-      QueryId queryId;
-      SubmitQueryResponse.Builder build = SubmitQueryResponse.newBuilder();
       try {
-        queryId = context.getGlobalEngine().executeQuery(request.getQuery());
-      } catch (SQLSyntaxError e) {
-        build.setResultCode(ResultCode.ERROR);
-        build.setErrorMessage(e.getMessage());
-        return build.build();
-
-      } catch (Exception e) {
-        build.setResultCode(ResultCode.ERROR);
-        String msg = e.getMessage();
-        if (msg == null) {
-          msg = "Internal Error";
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("Query [" + request.getQuery() + "] is submitted");
         }
-
-        if (LOG.isDebugEnabled()) {
-          LOG.error(msg, e);
+        return context.getGlobalEngine().executeQuery(request.getQuery());
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        ClientProtos.GetQueryStatusResponse.Builder responseBuilder = ClientProtos.GetQueryStatusResponse.newBuilder();
+        responseBuilder.setResultCode(ResultCode.ERROR);
+        if (e.getMessage() != null) {
+          responseBuilder.setErrorMessage(ExceptionUtils.getStackTrace(e));
         } else {
-          LOG.error(msg);
+          responseBuilder.setErrorMessage("Internal Error");
         }
-        build.setErrorMessage(msg);
-        return build.build();
+        return responseBuilder.build();
       }
-
-      LOG.info("Query " + queryId + " is submitted");
-      build.setResultCode(ResultCode.OK);
-      build.setQueryId(queryId.getProto());
-
-      return build.build();
     }
 
     @Override
@@ -183,13 +171,17 @@ public class TajoMasterClientService extends AbstractService {
                                                  GetQueryResultRequest request)
         throws ServiceException {
       QueryId queryId = new QueryId(request.getQueryId());
-      QueryMasterManager queryMasterManager = context.getQuery(queryId);
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
 
+      }
+      QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+      QueryInfo queryInfo = queryInProgress.getQueryInfo();
       GetQueryResultResponse.Builder builder
           = GetQueryResultResponse.newBuilder();
-      switch (queryMasterManager.getState()) {
+      switch (queryInfo.getQueryState()) {
         case QUERY_SUCCEEDED:
-          builder.setTableDesc((TableDescProto) queryMasterManager.getResultDesc().getProto());
+          // TODO check this logic needed
+          //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto());
           break;
         case QUERY_FAILED:
         case QUERY_ERROR:
@@ -218,23 +210,25 @@ public class TajoMasterClientService extends AbstractService {
       QueryId queryId = new QueryId(request.getQueryId());
       builder.setQueryId(request.getQueryId());
 
-      if (queryId.equals(TajoIdUtils.NullQueryId)) {
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
         builder.setResultCode(ResultCode.OK);
         builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
       } else {
-        QueryMasterManager queryMasterManager = context.getQuery(queryId);
-        if (queryMasterManager != null) {
+        QueryInProgress queryInProgress = context.getQueryJobManager().getQueryInProgress(queryId);
+        if (queryInProgress != null) {
+          QueryInfo queryInfo = queryInProgress.getQueryInfo();
           builder.setResultCode(ResultCode.OK);
-          builder.setState(queryMasterManager.getState());
-          builder.setProgress(queryMasterManager.getProgress());
-          builder.setSubmitTime(queryMasterManager.getAppSubmitTime());
-          if(queryMasterManager.getQueryMasterHost() != null) {
-            builder.setQueryMasterHost(queryMasterManager.getQueryMasterHost());
-            builder.setQueryMasterPort(queryMasterManager.getQueryMasterClientPort());
+          builder.setState(queryInfo.getQueryState());
+          builder.setProgress(queryInfo.getProgress());
+          builder.setSubmitTime(queryInfo.getStartTime());
+          if(queryInfo.getQueryMasterHost() != null) {
+            builder.setQueryMasterHost(queryInfo.getQueryMasterHost());
+            builder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
           }
-
-          if (queryMasterManager.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(queryMasterManager.getFinishTime());
+          //builder.setInitTime(queryJobManager.getInitializationTime());
+          //builder.setHasResult(!queryJobManager.isCreateTableStmt());
+          if (queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+            builder.setFinishTime(queryInfo.getFinishTime());
           } else {
             builder.setFinishTime(System.currentTimeMillis());
           }
@@ -249,11 +243,12 @@ public class TajoMasterClientService extends AbstractService {
 
     @Override
     public BoolProto killQuery(RpcController controller,
-                               ApplicationAttemptIdProto request)
+                               TajoIdProtos.QueryIdProto request)
         throws ServiceException {
       QueryId queryId = new QueryId(request);
-      QueryMasterManager queryMasterManager = context.getQuery(queryId);
-      //queryMasterManager.handle(new QueryEvent(queryId, QueryEventType.KILL));
+      QueryJobManager queryJobManager = context.getQueryJobManager();
+      //TODO KHJ, change QueryJobManager to event handler
+      //queryJobManager.handle(new QueryEvent(queryId, QueryEventType.KILL));
 
       return BOOL_TRUE;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
new file mode 100644
index 0000000..f0a4618
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryJobManager;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+public class TajoMasterService extends AbstractService {
+  private final static Log LOG = LogFactory.getLog(TajoMasterService.class);
+
+  private final TajoMaster.MasterContext context;
+  private final TajoConf conf;
+  private final TajoMasterServiceHandler masterHandler;
+  private ProtoAsyncRpcServer server;
+  private InetSocketAddress bindAddress;
+
+  private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build();
+  private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build();
+
+  public TajoMasterService(TajoMaster.MasterContext context) {
+    super(TajoMasterService.class.getName());
+    this.context = context;
+    this.conf = context.getConf();
+    this.masterHandler = new TajoMasterServiceHandler();
+  }
+
+  @Override
+  public void start() {
+    // TODO resolve hostname
+    String confMasterServiceAddr = conf.getVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS);
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr);
+    try {
+      server = new ProtoAsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa);
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+    server.start();
+    bindAddress = server.getListenAddress();
+    this.conf.setVar(TajoConf.ConfVars.TAJO_MASTER_SERVICE_ADDRESS,
+        org.apache.tajo.util.NetUtils.getIpPortString(bindAddress));
+    LOG.info("Instantiated TajoMasterService at " + this.bindAddress);
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(server != null) {
+      server.shutdown();
+      server = null;
+    }
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  public class TajoMasterServiceHandler
+      implements TajoMasterProtocol.TajoMasterProtocolService.Interface {
+    @Override
+    public void heartbeat(
+        RpcController controller,
+        TajoMasterProtocol.TajoHeartbeat request, RpcCallback<TajoMasterProtocol.TajoHeartbeatResponse> done) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Received QueryHeartbeat:" + request.getTajoWorkerHost() + ":" + request.getTajoWorkerPort());
+      }
+
+      TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null;
+      if(request.hasQueryId()) {
+        QueryId queryId = new QueryId(request.getQueryId());
+
+        //heartbeat from querymaster
+        //LOG.info("Received QueryHeartbeat:" + queryId + "," + request);
+        QueryJobManager queryJobManager = context.getQueryJobManager();
+        command = queryJobManager.queryHeartbeat(request);
+      } else {
+        //heartbeat from TajoWorker
+        context.getResourceManager().workerHeartbeat(request);
+      }
+
+      //ApplicationAttemptId attemptId = queryJobManager.getAppAttemptId();
+      //String attemptIdStr = attemptId == null ? null : attemptId.toString();
+      TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder();
+      builder.setHeartbeatResult(BOOL_TRUE);
+      if(command != null) {
+        builder.setResponseCommand(command);
+      }
+      done.run(builder.build());
+    }
+
+    @Override
+    public void allocateWorkerResources(
+        RpcController controller,
+        TajoMasterProtocol.WorkerResourceAllocationRequest request,
+        RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> done) {
+      context.getResourceManager().allocateWorkerResources(request, done);
+
+//      List<String> workerHosts = new ArrayList<String>();
+//      for(WorkerResource eachWorker: workerResources) {
+//        workerHosts.add(eachWorker.getAllocatedHost() + ":" + eachWorker.getPorts()[0]);
+//      }
+//
+//      done.run(TajoMasterProtocol.WorkerResourceAllocationResponse.newBuilder()
+//          .setExecutionBlockId(request.getExecutionBlockId())
+//          .addAllAllocatedWorks(workerHosts)
+//          .build()
+//      );
+    }
+
+    @Override
+    public void releaseWorkerResource(RpcController controller,
+                                           TajoMasterProtocol.WorkerResourceReleaseRequest request,
+                                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+      List<TajoMasterProtocol.WorkerResourceProto> workerResources = request.getWorkerResourcesList();
+      for(TajoMasterProtocol.WorkerResourceProto eachWorkerResource: workerResources) {
+        WorkerResource workerResource = new WorkerResource();
+        String[] tokens = eachWorkerResource.getWorkerHostAndPort().split(":");
+        workerResource.setAllocatedHost(tokens[0]);
+        workerResource.setPorts(new int[]{Integer.parseInt(tokens[1])});
+        workerResource.setMemoryMBSlots(eachWorkerResource.getMemoryMBSlots());
+        workerResource.setDiskSlots(eachWorkerResource.getDiskSlots());
+
+        LOG.info("====> releaseWorkerResource:" + workerResource);
+        context.getResourceManager().releaseWorkerResource(
+            new QueryId(eachWorkerResource.getExecutionBlockId().getQueryId()),
+            workerResource);
+      }
+      done.run(BOOL_TRUE);
+    }
+
+    @Override
+    public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request,
+                                RpcCallback<BoolProto> done) {
+      context.getQueryJobManager().stopQuery(new QueryId(request));
+      done.run(BOOL_TRUE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
index 93aaa5d..1e6655c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerGroupEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master;
 
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 
 import java.util.Collection;
@@ -31,17 +31,21 @@ public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
     CONTAINER_REMOTE_CLEANUP
   }
 
-  protected final SubQueryId subQueryId;
+  protected final ExecutionBlockId executionBlockId;
   protected final Collection<Container> containers;
   public TaskRunnerGroupEvent(EventType eventType,
-                              SubQueryId subQueryId,
+                              ExecutionBlockId executionBlockId,
                               Collection<Container> containers) {
     super(eventType);
-    this.subQueryId = subQueryId;
+    this.executionBlockId = executionBlockId;
     this.containers = containers;
   }
 
   public Collection<Container> getContainers() {
     return containers;
   }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
 }