You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/14 08:48:03 UTC

[5/8] TAJO-91: Launch QueryMaster on NodeManager per query. (hyoungjunkim via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
index 8e0abd0..d8ddb46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
@@ -21,76 +21,45 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryConf;
 import org.apache.tajo.SubQueryId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.QueryMaster.QueryContext;
 import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 import org.apache.tajo.master.event.QueryEvent;
 import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.worker.TaskRunner;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
 
   /** Class Logger */
   private static final Log LOG = LogFactory.getLog(TaskRunnerLauncherImpl.class);
-  private final YarnRPC yarnRPC;
-  private final static RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
   private QueryContext context;
-  private final String taskListenerHost;
-  private final int taskListenerPort;
+  private final String queryMasterHost;
+  private final int queryMasterPort;
 
   // For ContainerLauncherSpec
-  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
-  private static String initialClasspath = null;
-  private static final Object classpathLock = new Object();
-  private Object commonContainerSpecLock = new Object();
   private ContainerLaunchContext commonContainerSpec = null;
 
-  final public static FsPermission QUERYCONF_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644); // rw-r--r--
-
   /** for launching TaskRunners in parallel */
   private final ExecutorService executorService;
 
   public TaskRunnerLauncherImpl(QueryContext context) {
     super(TaskRunnerLauncherImpl.class.getName());
     this.context = context;
-    taskListenerHost = context.getTaskListener().getHostName();
-    taskListenerPort = context.getTaskListener().getPort();
-    yarnRPC = context.getYarnRPC();
+    queryMasterHost = context.getQueryMasterServiceAddress().getHostName();
+    queryMasterPort = context.getQueryMasterServiceAddress().getPort();
     executorService = Executors.newFixedThreadPool(
         context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
   }
@@ -100,7 +69,22 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
   }
 
   public void stop() {
-    executorService.shutdown();
+    executorService.shutdownNow();
+
+    while(!executorService.isTerminated()) {
+      LOG.info("executorService.isTerminated:" + executorService.isTerminated() + "," + executorService.isShutdown());
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+    Map<ContainerId, ContainerProxy> containers = context.getContainers();
+    for(ContainerProxy eachProxy: containers.values()) {
+      try {
+        eachProxy.kill();
+      } catch (Exception e) {
+      }
+    }
     super.stop();
   }
 
@@ -114,427 +98,82 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
   }
 
   private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
+    commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig());
     for (Container container : containers) {
-      final ContainerProxy proxy = new ContainerProxy(container, subQueryId);
-      executorService.submit(new LaunchRunner(proxy));
+      final ContainerProxy proxy =
+          new TaskRunnerContainerProxy(context, getConfig(), context.getYarnRPC(), container, subQueryId);
+      executorService.submit(new LaunchRunner(container.getId(), proxy));
     }
   }
 
   private class LaunchRunner implements Runnable {
     private final ContainerProxy proxy;
-    public LaunchRunner(ContainerProxy proxy) {
+    private final ContainerId id;
+
+    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
       this.proxy = proxy;
+      this.id = id;
     }
     @Override
     public void run() {
-      proxy.launch();
+      proxy.launch(commonContainerSpec);
+      LOG.info("ContainerProxy started:" + id);
     }
   }
 
   private void killTaskRunners(Collection<Container> containers) {
     for (Container container : containers) {
       final ContainerProxy proxy = context.getContainer(container.getId());
-      executorService.submit(new KillRunner(proxy));
+      executorService.submit(new KillRunner(container.getId(), proxy));
     }
   }
 
   private class KillRunner implements Runnable {
     private final ContainerProxy proxy;
-    public KillRunner(ContainerProxy proxy) {
+    private final ContainerId id;
+    public KillRunner(ContainerId id, ContainerProxy proxy) {
+      this.id = id;
       this.proxy = proxy;
     }
 
     @Override
     public void run() {
       proxy.kill();
+      LOG.info("ContainerProxy killed:" + id);
     }
   }
 
-
-  /**
-   * Lock this on initialClasspath so that there is only one fork in the AM for
-   * getting the initial class-path. TODO: We already construct
-   * a parent CLC and use it for all the containers, so this should go away
-   * once the mr-generated-classpath stuff is gone.
-   */
-  private static String getInitialClasspath(Configuration conf) {
-    synchronized (classpathLock) {
-      if (initialClasspathFlag.get()) {
-        return initialClasspath;
-      }
-      Map<String, String> env = new HashMap<String, String>();
-
-      initialClasspath = env.get(Environment.CLASSPATH.name());
-      initialClasspathFlag.set(true);
-      return initialClasspath;
-    }
-  }
-
-  private ContainerLaunchContext createCommonContainerLaunchContext() {
-    TajoConf conf = (TajoConf) getConfig();
-
-    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-    try {
-      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the env variables to be setup
-    ////////////////////////////////////////////////////////////////////////////
-    LOG.info("Set the environment for the application master");
-
-    Map<String, String> environment = new HashMap<String, String>();
-    //String initialClassPath = getInitialClasspath(conf);
-    environment.put(Environment.SHELL.name(), "/bin/bash");
-    environment.put(Environment.JAVA_HOME.name(), System.getenv(Environment.JAVA_HOME.name()));
-
-    // TODO - to be improved with org.apache.tajo.sh shell script
-    Properties prop = System.getProperties();
-    if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE")) {
-      environment.put(Environment.CLASSPATH.name(), prop.getProperty(
-          "java.class.path", null));
-    } else {
-      // Add AppMaster.jar location to classpath
-      // At some point we should not be required to add
-      // the hadoop specific classpaths to the env.
-      // It should be provided out of the box.
-      // For now setting all required classpaths including
-      // the classpath to "." for the application jar
-      StringBuilder classPathEnv = new StringBuilder("./");
-      //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
-      for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
-        classPathEnv.append(':');
-        classPathEnv.append(c.trim());
-      }
-
-      classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
-      classPathEnv.append(":./log4j.properties:./*");
-      environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
-      environment.put(
-          Environment.HADOOP_COMMON_HOME.name(),
-          System.getenv("HADOOP_HOME"));
-      environment.put(
-          Environment.HADOOP_HDFS_HOME.name(),
-          System.getenv("HADOOP_HOME"));
-      environment.put(
-          Environment.HADOOP_YARN_HOME.name(),
-          System.getenv("HADOOP_HOME"));
-      environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
-      environment.put(Environment.CLASSPATH.name(), classPathEnv.toString());
-    }
-
-    ctx.setEnvironment(environment);
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    FileSystem fs = null;
-
-
-    LOG.info("defaultFS: " + conf.get("fs.default.name"));
-    LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
-    try {
-      fs = FileSystem.get(conf);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    FileContext fsCtx = null;
-    try {
-      fsCtx = FileContext.getFileContext(getConfig());
-    } catch (UnsupportedFileSystemException e) {
-      e.printStackTrace();
-    }
-
-    LOG.info("Writing a QueryConf to HDFS and add to local environment");
-    Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
-    try {
-      writeConf(conf, queryConfPath);
-
-      LocalResource queryConfSrc = createApplicationResource(fsCtx,
-          queryConfPath, LocalResourceType.FILE);
-      localResources.put(QueryConf.FILENAME,  queryConfSrc);
-
-      ctx.setLocalResources(localResources);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    // Add shuffle token
-    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-    try {
-      //LOG.info("Putting shuffle token in serviceData");
-      serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID,
-          PullServerAuxService.serializeMetaData(0));
-    } catch (IOException ioe) {
-      LOG.error(ioe);
-    }
-    ctx.setServiceData(serviceData);
-
-    return ctx;
-  }
-
-  protected ContainerManager getCMProxy(ContainerId containerID,
-                                        final String containerManagerBindAddr,
-                                        ContainerToken containerToken)
-      throws IOException {
-    String [] hosts = containerManagerBindAddr.split(":");
-    final InetSocketAddress cmAddr =
-        new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
-    UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Token<ContainerTokenIdentifier> token =
-          ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
-      // the user in createRemoteUser in this context has to be ContainerID
-      user = UserGroupInformation.createRemoteUser(containerID.toString());
-      user.addToken(token);
-    }
-
-    ContainerManager proxy = user
-        .doAs(new PrivilegedAction<ContainerManager>() {
-          @Override
-          public ContainerManager run() {
-            return (ContainerManager) yarnRPC.getProxy(ContainerManager.class,
-                cmAddr, getConfig());
-          }
-        });
-    return proxy;
-  }
-
-  private LocalResource createApplicationResource(FileContext fs,
-                                                  Path p, LocalResourceType type)
-      throws IOException {
-    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
-    FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
-        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
-    rsrc.setSize(rsrcStat.getLen());
-    rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    return rsrc;
-  }
-
-  private void writeConf(Configuration conf, Path queryConfFile)
-      throws IOException {
-    // Write job file to Tajo's fs
-    FileSystem fs = queryConfFile.getFileSystem(conf);
-    FSDataOutputStream out =
-        FileSystem.create(fs, queryConfFile,
-            new FsPermission(QUERYCONF_FILE_PERMISSION));
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
-  private static enum ContainerState {
-    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
-  }
-
-  public class ContainerProxy {
-    private ContainerState state;
-    // store enough information to be able to cleanup the container
-    private Container container;
-    private ContainerId containerID;
-    final private String containerMgrAddress;
-    private ContainerToken containerToken;
-    private String hostName;
-    private int port = -1;
+  public class TaskRunnerContainerProxy extends ContainerProxy {
     private final SubQueryId subQueryId;
 
-    public ContainerProxy(Container container, SubQueryId subQueryId) {
-      this.state = ContainerState.PREP;
-      this.container = container;
-      this.containerID = container.getId();
-      NodeId nodeId = container.getNodeId();
-      this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();;
-      this.containerToken = container.getContainerToken();
+    public TaskRunnerContainerProxy(QueryMaster.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+                                    Container container, SubQueryId subQueryId) {
+      super(context, conf, yarnRPC, container);
       this.subQueryId = subQueryId;
     }
 
-    public synchronized boolean isCompletelyDone() {
-      return state == ContainerState.DONE || state == ContainerState.FAILED;
-    }
-
-    @SuppressWarnings("unchecked")
-    public synchronized void launch() {
-      LOG.info("Launching Container with Id: " + containerID);
-      if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
-        state = ContainerState.DONE;
-        LOG.error("Container (" + containerID + " was killed before it was launched");
-        return;
-      }
-
-      ContainerManager proxy = null;
-      try {
-
-        proxy = getCMProxy(containerID, containerMgrAddress,
-            containerToken);
-
-        // Construct the actual Container
-        ContainerLaunchContext containerLaunchContext = createContainerLaunchContext();
-
-        // Now launch the actual container
-        StartContainerRequest startRequest = Records
-            .newRecord(StartContainerRequest.class);
-        startRequest.setContainerLaunchContext(containerLaunchContext);
-        StartContainerResponse response = proxy.startContainer(startRequest);
-
-        ByteBuffer portInfo = response
-            .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
-
-        if(portInfo != null) {
-          port = PullServerAuxService.deserializeMetaData(portInfo);
-        }
-
-        LOG.info("PullServer port returned by ContainerManager for "
-            + containerID + " : " + port);
-
-        if(port < 0) {
-          this.state = ContainerState.FAILED;
-          throw new IllegalStateException("Invalid shuffle port number "
-              + port + " returned for " + containerID);
-        }
-
-        // after launching, send launched event to task attempt to move
-        // it from ASSIGNED to RUNNING state
-//      context.getEventHandler().handle(new AMContainerEventLaunched(containerID, port));
-
-        // this is workaround code
-        context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
-
-        this.state = ContainerState.RUNNING;
-        this.hostName = containerMgrAddress.split(":")[0];
-        context.addContainer(containerID, this);
-      } catch (Throwable t) {
-        String message = "Container launch failed for " + containerID + " : "
-            + StringUtils.stringifyException(t);
-        this.state = ContainerState.FAILED;
-        LOG.error(message);
-      } finally {
-        if (proxy != null) {
-          yarnRPC.stopProxy(proxy, getConfig());
-        }
-      }
+    @Override
+    protected void containerStarted() {
+      context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
     }
 
-    public synchronized void kill() {
-
-      if(isCompletelyDone()) {
-        return;
-      }
-      if(this.state == ContainerState.PREP) {
-        this.state = ContainerState.KILLED_BEFORE_LAUNCH;
-      } else {
-        LOG.info("KILLING " + containerID);
-
-        ContainerManager proxy = null;
-        try {
-          proxy = getCMProxy(this.containerID, this.containerMgrAddress,
-              this.containerToken);
-
-          // kill the remote container if already launched
-          StopContainerRequest stopRequest = Records
-              .newRecord(StopContainerRequest.class);
-          stopRequest.setContainerId(this.containerID);
-          proxy.stopContainer(stopRequest);
-          // If stopContainer returns without an error, assuming the stop made
-          // it over to the NodeManager.
-//          context.getEventHandler().handle(
-//              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
-          context.removeContainer(containerID);
-        } catch (Throwable t) {
-
-          // ignore the cleanup failure
-          String message = "cleanup failed for container "
-              + this.containerID + " : "
-              + StringUtils.stringifyException(t);
-//          context.getEventHandler().handle(
-//              new AMContainerEventStopFailed(containerID, message));
-          LOG.warn(message);
-          this.state = ContainerState.DONE;
-          return;
-        } finally {
-          if (proxy != null) {
-            yarnRPC.stopProxy(proxy, getConfig());
-          }
-        }
-        this.state = ContainerState.DONE;
-      }
+    @Override
+    protected String getId() {
+      return subQueryId.toString();
     }
 
-    public ContainerLaunchContext createContainerLaunchContext() {
-      synchronized (commonContainerSpecLock) {
-        if (commonContainerSpec == null) {
-          commonContainerSpec = createCommonContainerLaunchContext();
-        }
-      }
-
-      // Setup environment by cloning from common env.
-      Map<String, String> env = commonContainerSpec.getEnvironment();
-      Map<String, String> myEnv = new HashMap<String, String>(env.size());
-      myEnv.putAll(env);
-
-      // Duplicate the ByteBuffers for access by multiple containers.
-      Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-      for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
-          .getServiceData().entrySet()) {
-        myServiceData.put(entry.getKey(), entry.getValue().duplicate());
-      }
-
-      ////////////////////////////////////////////////////////////////////////////
-      // Set the local resources
-      ////////////////////////////////////////////////////////////////////////////
-      // Set the necessary command to execute the application master
-      Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
-      // Set java executable command
-      //LOG.info("Setting up app master command");
-      vargs.add("${JAVA_HOME}" + "/bin/java");
-      // Set Xmx based on am memory size
-      vargs.add("-Xmx2000m");
-      // Set Remote Debugging
-      //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
-      //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
-      //}
-      // Set class name
-      vargs.add("org.apache.tajo.worker.TaskRunner");
-      vargs.add(taskListenerHost); // tasklistener hostname
-      vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
-      vargs.add(subQueryId.toString()); // subqueryId
-      vargs.add(containerMgrAddress); // nodeId
-      vargs.add(containerID.toString()); // containerId
-
-      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-      // Get final commmand
-      StringBuilder command = new StringBuilder();
-      for (CharSequence str : vargs) {
-        command.append(str).append(" ");
-      }
-
-      LOG.info("Completed setting up TaskRunner command " + command.toString());
-      List<String> commands = new ArrayList<String>();
-      commands.add(command.toString());
-
-      return BuilderUtils.newContainerLaunchContext(containerID, commonContainerSpec.getUser(),
-          container.getResource(), commonContainerSpec.getLocalResources(), myEnv, commands,
-          myServiceData, null, new HashMap<ApplicationAccessType, String>());
+    @Override
+    protected String getRunnerClass() {
+      return TaskRunner.class.getCanonicalName();
     }
 
-    public String getHostName() {
-      return this.hostName;
-    }
+    @Override
+    protected Vector<CharSequence> getTaskParams() {
+      Vector<CharSequence> taskParams = new Vector<CharSequence>();
+      taskParams.add(queryMasterHost); // queryMaster hostname
+      taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
 
-    public int getPullServerPort() {
-      return this.port;
+      return taskParams;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java
deleted file mode 100644
index ce16897..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerListener.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos.QueryUnitAttemptIdProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import org.apache.tajo.ipc.MasterWorkerProtocol;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import org.apache.tajo.master.QueryMaster.QueryContext;
-import org.apache.tajo.master.event.TaskAttemptStatusUpdateEvent;
-import org.apache.tajo.master.event.TaskCompletionEvent;
-import org.apache.tajo.master.event.TaskFatalErrorEvent;
-import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-public class TaskRunnerListener extends AbstractService
-    implements MasterWorkerProtocolService.Interface {
-  
-  private final static Log LOG = LogFactory.getLog(
-      org.apache.tajo.master.cluster.WorkerListener.class);
-  private QueryContext context;
-  private ProtoAsyncRpcServer rpcServer;
-  private InetSocketAddress bindAddr;
-  private String addr;
-  
-  public TaskRunnerListener(final QueryContext context) throws Exception {
-    super(org.apache.tajo.master.cluster.WorkerListener.class.getName());
-    this.context = context;
-
-
-    InetSocketAddress initIsa =
-        new InetSocketAddress(InetAddress.getLocalHost(), 0);
-    if (initIsa.getAddress() == null) {
-      throw new IllegalArgumentException("Failed resolve of " + initIsa);
-    }
-    try {
-      this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
-          this, initIsa);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    this.rpcServer.start();
-    this.bindAddr = rpcServer.getBindAddress();
-    this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    // Setup RPC server
-    try {
-      InetSocketAddress initIsa =
-          new InetSocketAddress(InetAddress.getLocalHost(), 0);
-      if (initIsa.getAddress() == null) {
-        throw new IllegalArgumentException("Failed resolve of " + initIsa);
-      }
-
-      this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
-          this, initIsa);
-
-      this.rpcServer.start();
-      this.bindAddr = rpcServer.getBindAddress();
-      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-
-    // Get the master address
-    LOG.info(org.apache.tajo.master.cluster.WorkerListener.class.getSimpleName() + " is bind to " + addr);
-    context.getConf().setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-
-
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    rpcServer.shutdown();
-    super.stop();
-  }
-  
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddr;
-  }
-  
-  public String getAddress() {
-    return this.addr;
-  }
-
-  static BoolProto TRUE_PROTO = BoolProto.newBuilder().setValue(true).build();
-
-  @Override
-  public void getTask(RpcController controller, ContainerIdProto request,
-                      RpcCallback<QueryUnitRequestProto> done) {
-    context.getEventHandler().handle(new TaskRequestEvent(
-        new ContainerIdPBImpl(request), done));
-  }
-
-  @Override
-  public void statusUpdate(RpcController controller, TaskStatusProto request,
-                           RpcCallback<BoolProto> done) {
-    QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
-    context.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId,
-        request));
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void ping(RpcController controller,
-                   QueryUnitAttemptIdProto attemptIdProto,
-                   RpcCallback<BoolProto> done) {
-    // TODO - to be completed
-    QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-//    context.getQuery(attemptId.getQueryId()).getSubQuery(attemptId.getSubQueryId()).
-//        getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
-//        resetExpireTime();
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void fatalError(RpcController controller, TaskFatalErrorReport report,
-                         RpcCallback<BoolProto> done) {
-    context.getEventHandler().handle(new TaskFatalErrorEvent(report));
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void done(RpcController controller, TaskCompletionReport report,
-                       RpcCallback<BoolProto> done) {
-    context.getEventHandler().handle(new TaskCompletionEvent(report));
-    done.run(TRUE_PROTO);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 381c333..62e702d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -29,18 +29,18 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.SubQueryId;
-import org.apache.tajo.engine.MasterWorkerProtos;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
-import org.apache.tajo.master.QueryMaster.QueryContext;
-import org.apache.tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
 import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
 import org.apache.tajo.master.event.TaskRequestEvent;
 import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
 import org.apache.tajo.master.event.TaskScheduleEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.util.TajoIdUtils;
 
@@ -96,9 +96,11 @@ public class TaskSchedulerImpl extends AbstractService
             event = eventQueue.take();
             handleEvent(event);
           } catch (InterruptedException e) {
-            LOG.error("Returning, iterrupted : " + e);
+            //LOG.error("Returning, iterrupted : " + e);
+            break;
           }
         }
+        LOG.info("TaskScheduler eventHandlingThread stopped");
       }
     };
 
@@ -111,11 +113,12 @@ public class TaskSchedulerImpl extends AbstractService
           try {
             Thread.sleep(1000);
           } catch (InterruptedException e) {
-            LOG.warn(e);
+            break;
           }
 
           schedule();
         }
+        LOG.info("TaskScheduler schedulingThread stopped");
       }
     };
 
@@ -124,14 +127,13 @@ public class TaskSchedulerImpl extends AbstractService
   }
 
   private static final QueryUnitAttemptId NULL_ID;
-  private static final MasterWorkerProtos.QueryUnitRequestProto stopTaskRunnerReq;
+  private static final QueryMasterProtocol.QueryUnitRequestProto stopTaskRunnerReq;
   static {
     SubQueryId nullSubQuery =
         QueryIdFactory.newSubQueryId(TajoIdUtils.NullQueryId);
     NULL_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
 
-    MasterWorkerProtos.QueryUnitRequestProto.Builder builder =
-                MasterWorkerProtos.QueryUnitRequestProto.newBuilder();
+    QueryMasterProtocol.QueryUnitRequestProto.Builder builder = QueryMasterProtocol.QueryUnitRequestProto.newBuilder();
     builder.setId(NULL_ID.getProto());
     builder.setShouldDie(true);
     builder.setOutputTable("");
@@ -151,6 +153,7 @@ public class TaskSchedulerImpl extends AbstractService
       req.getCallback().run(stopTaskRunnerReq);
     }
 
+    LOG.info("Task Scheduler stopped");
     super.stop();
   }
 
@@ -301,7 +304,7 @@ public class TaskSchedulerImpl extends AbstractService
       while (it.hasNext() && leafTasks.size() > 0) {
         taskRequest = it.next();
         ContainerProxy container = context.getContainer(taskRequest.getContainerId());
-        String hostName = container.getHostName();
+        String hostName = container.getTaskHostName();
 
         QueryUnitAttemptId attemptId = null;
 
@@ -360,7 +363,7 @@ public class TaskSchedulerImpl extends AbstractService
 
           context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
               taskRequest.getContainerId(),
-              container.getHostName(), container.getPullServerPort()));
+              container.getTaskHostName(), container.getTaskPort()));
           AssignedRequest.add(attemptId);
 
           totalAssigned++;
@@ -411,7 +414,7 @@ public class TaskSchedulerImpl extends AbstractService
           ContainerProxy container = context.getContainer(
               taskRequest.getContainerId());
           context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
-              taskRequest.getContainerId(), container.getHostName(), container.getPullServerPort()));
+              taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
           taskRequest.getCallback().run(taskAssign.getProto());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java
deleted file mode 100644
index b5bb84c..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/cluster/WorkerListener.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.cluster;
-
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos.QueryUnitAttemptIdProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
-import org.apache.tajo.ipc.MasterWorkerProtocol;
-import org.apache.tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import org.apache.tajo.master.TajoMaster.MasterContext;
-import org.apache.tajo.master.event.TaskAttemptStatusUpdateEvent;
-import org.apache.tajo.master.event.TaskCompletionEvent;
-import org.apache.tajo.master.event.TaskFatalErrorEvent;
-import org.apache.tajo.rpc.ProtoAsyncRpcServer;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-
-import java.net.InetSocketAddress;
-
-public class WorkerListener extends AbstractService
-    implements MasterWorkerProtocolService.Interface {
-  
-  private final static Log LOG = LogFactory.getLog(WorkerListener.class);
-  private MasterContext context;
-  private ProtoAsyncRpcServer rpcServer;
-  private InetSocketAddress bindAddr;
-  private String addr;
-  
-  public WorkerListener(final MasterContext context) throws Exception {
-    super(WorkerListener.class.getName());
-    this.context = context;
-
-    String confMasterAddr = context.getConf().getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS);
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterAddr);
-    if (initIsa.getAddress() == null) {
-      throw new IllegalArgumentException("Failed resolve of " + initIsa);
-    }
-    try {
-      this.rpcServer = new ProtoAsyncRpcServer(MasterWorkerProtocol.class,
-          this, initIsa);
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-    this.rpcServer.start();
-    this.bindAddr = rpcServer.getBindAddress();
-    this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
-
-    // Setup RPC server
-    // Get the master address
-    LOG.info(WorkerListener.class.getSimpleName() + " is bind to " + addr);
-    context.getConf().setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    super.start();
-  }
-
-  @Override
-  public void stop() {
-    rpcServer.shutdown();
-    super.stop();
-  }
-  
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddr;
-  }
-  
-  public String getAddress() {
-    return this.addr;
-  }
-
-  static BoolProto TRUE_PROTO = BoolProto.newBuilder().setValue(true).build();
-
-  @Override
-  public void getTask(RpcController controller, ContainerIdProto request,
-                      RpcCallback<QueryUnitRequestProto> done) {
-    //LOG.info("Get TaskRequest from " + request.getHost());
-    //context.getEventHandler().handle(new TaskRequestEvent(new NodeIdPBImpl(request), done));
-  }
-
-  @Override
-  public void statusUpdate(RpcController controller, TaskStatusProto request,
-                           RpcCallback<BoolProto> done) {
-    QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
-    context.getEventHandler().handle(new TaskAttemptStatusUpdateEvent(attemptId,
-        request));
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void ping(RpcController controller,
-                   QueryUnitAttemptIdProto attemptIdProto,
-                   RpcCallback<BoolProto> done) {
-    QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
-    context.getQuery(attemptId.getQueryId()).getContext().getSubQuery(attemptId.getSubQueryId()).
-        getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
-        resetExpireTime();
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void fatalError(RpcController controller, TaskFatalErrorReport report,
-                         RpcCallback<BoolProto> done) {
-    context.getEventHandler().handle(new TaskFatalErrorEvent(report));
-    done.run(TRUE_PROTO);
-  }
-
-  @Override
-  public void done(RpcController controller, TaskCompletionReport report,
-                       RpcCallback<BoolProto> done) {
-    context.getEventHandler().handle(new TaskCompletionEvent(report));
-    done.run(TRUE_PROTO);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
index 4032c67..26c7231 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.SubQueryId;
-import org.apache.tajo.master.SubQueryState;
+import org.apache.tajo.master.querymaster.SubQueryState;
 
 public class SubQueryCompletedEvent extends QueryEvent {
   private final SubQueryId subQueryId;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
index 3191639..d85d4f2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master.event;
 
 import org.apache.tajo.SubQueryId;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.master.SubQueryState;
+import org.apache.tajo.master.querymaster.SubQueryState;
 
 public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
   private final TableMeta tableMeta;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index 6409b43..bc84011 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskStatusProto;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskStatusProto;
 
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
   private final TaskStatusProto status;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index b36d69c..e3a4b5f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskCompletionReport;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskCompletionReport;
 
 public class TaskCompletionEvent extends TaskAttemptEvent {
   private TaskCompletionReport report;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index 3d1c78d..06fb392 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+import org.apache.tajo.ipc.QueryMasterProtocol.TaskFatalErrorReport;
 
 public class TaskFatalErrorEvent extends TaskAttemptEvent {
   private TaskFatalErrorReport report;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 25a8a14..166e103 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -21,7 +21,7 @@ package org.apache.tajo.master.event;
 import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryUnitRequestProto;
 import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
 
 public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9d020883/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
new file mode 100644
index 0000000..3179abf
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SubQueryId;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableDescImpl;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.master.ExecutionBlock;
+import org.apache.tajo.master.ExecutionBlockCursor;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.storage.StorageManager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class Query implements EventHandler<QueryEvent> {
+  private static final Log LOG = LogFactory.getLog(Query.class);
+
+
+  // Facilities for Query
+  private final QueryConf conf;
+  private final Clock clock;
+  private String queryStr;
+  private Map<SubQueryId, SubQuery> subqueries;
+  private final EventHandler eventHandler;
+  private final MasterPlan plan;
+  private final StorageManager sm;
+  private QueryContext context;
+  private ExecutionBlockCursor cursor;
+
+  // Query Status
+  private final QueryId id;
+  private long appSubmitTime;
+  private long startTime;
+  private long initializationTime;
+  private long finishTime;
+  private TableDesc resultDesc;
+  private int completedSubQueryCount = 0;
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  // Internal Variables
+  private final Lock readLock;
+  private final Lock writeLock;
+  private int priority = 100;
+
+  // State Machine
+  private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+
+  private static final StateMachineFactory
+      <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+      new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+          (QueryState.QUERY_NEW)
+
+      .addTransition(QueryState.QUERY_NEW,
+          EnumSet.of(QueryState.QUERY_INIT, QueryState.QUERY_FAILED),
+          QueryEventType.INIT, new InitTransition())
+
+      .addTransition(QueryState.QUERY_INIT, QueryState.QUERY_RUNNING,
+          QueryEventType.START, new StartTransition())
+
+      .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+          QueryEventType.INIT_COMPLETED, new InitCompleteTransition())
+      .addTransition(QueryState.QUERY_RUNNING,
+          EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED,
+              QueryState.QUERY_FAILED),
+          QueryEventType.SUBQUERY_COMPLETED,
+          new SubQueryCompletedTransition())
+      .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+          QueryEventType.INTERNAL_ERROR, new InternalErrorTransition())
+       .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+          QueryEventType.INTERNAL_ERROR)
+
+      .installTopology();
+
+  public Query(final QueryContext context, final QueryId id, Clock clock,
+               final long appSubmitTime,
+               final String queryStr,
+               final EventHandler eventHandler,
+               final MasterPlan plan,
+               final StorageManager sm) {
+    this.context = context;
+    this.conf = context.getConf();
+    this.id = id;
+    this.clock = clock;
+    this.appSubmitTime = appSubmitTime;
+    this.queryStr = queryStr;
+    subqueries = Maps.newHashMap();
+    this.eventHandler = eventHandler;
+    this.plan = plan;
+    this.sm = sm;
+    cursor = new ExecutionBlockCursor(plan);
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  public boolean isCreateTableStmt() {
+    return context.isCreateTableQuery();
+  }
+
+//  protected FileSystem getFileSystem(Configuration conf) throws IOException {
+//    return FileSystem.get(conf);
+//  }
+
+  public float getProgress() {
+    QueryState state = getStateMachine().getCurrentState();
+    if (state == QueryState.QUERY_SUCCEEDED) {
+      return 1.0f;
+    } else {
+      int idx = 0;
+      float [] subProgresses = new float[subqueries.size()];
+      boolean finished = true;
+      for (SubQuery subquery: subqueries.values()) {
+        if (subquery.getState() != SubQueryState.NEW) {
+          subProgresses[idx] = subquery.getProgress();
+          if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
+            finished = false;
+          }
+        } else {
+          subProgresses[idx] = 0.0f;
+        }
+        idx++;
+      }
+
+      if (finished) {
+        return 1.0f;
+      }
+
+      float totalProgress = 0;
+      float proportion = 1.0f / (float)subqueries.size();
+
+      for (int i = 0; i < subProgresses.length; i++) {
+        totalProgress += subProgresses[i] * proportion;
+      }
+
+      return totalProgress;
+    }
+  }
+
+  public long getAppSubmitTime() {
+    return this.appSubmitTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime() {
+    startTime = clock.getTime();
+  }
+
+  public long getInitializationTime() {
+    return initializationTime;
+  }
+
+  public void setInitializationTime() {
+    initializationTime = clock.getTime();
+  }
+
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime() {
+    finishTime = clock.getTime();
+  }
+
+  public List<String> getDiagnostics() {
+    readLock.lock();
+    try {
+      return diagnostics;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected void addDiagnostic(String diag) {
+    diagnostics.add(diag);
+  }
+
+  public TableDesc getResultDesc() {
+    return resultDesc;
+  }
+
+  public void setResultDesc(TableDesc desc) {
+    resultDesc = desc;
+  }
+
+  public MasterPlan getPlan() {
+    return plan;
+  }
+
+  public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+    return stateMachine;
+  }
+  
+  public void addSubQuery(SubQuery subquery) {
+    subqueries.put(subquery.getId(), subquery);
+  }
+  
+  public QueryId getId() {
+    return this.id;
+  }
+  
+  public SubQuery getSubQuery(SubQueryId id) {
+    return this.subqueries.get(id);
+  }
+
+  public QueryState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public ExecutionBlockCursor getExecutionBlockCursor() {
+    return cursor;
+  }
+
+  static class InitTransition
+      implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+    @Override
+    public QueryState transition(Query query, QueryEvent queryEvent) {
+      query.setStartTime();
+      query.context.setState(QueryState.QUERY_INIT);
+      return QueryState.QUERY_INIT;
+    }
+  }
+
+  public static class StartTransition
+      implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent queryEvent) {
+      SubQuery subQuery = new SubQuery(query.context, query.getExecutionBlockCursor().nextBlock(),
+          query.sm);
+      subQuery.setPriority(query.priority--);
+      query.addSubQuery(subQuery);
+      LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+      subQuery.handle(new SubQueryEvent(subQuery.getId(),
+          SubQueryEventType.SQ_INIT));
+    }
+  }
+
+  public static class SubQueryCompletedTransition implements
+      MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+    @Override
+    public QueryState transition(Query query, QueryEvent event) {
+      // increase the count for completed subqueries
+      query.completedSubQueryCount++;
+      SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+
+      // if the subquery is succeeded
+      if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
+        if (cursor.hasNext()) {
+          SubQuery nextSubQuery = new SubQuery(query.context, cursor.nextBlock(), query.sm);
+          nextSubQuery.setPriority(query.priority--);
+          query.addSubQuery(nextSubQuery);
+          nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
+              SubQueryEventType.SQ_INIT));
+          LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+          LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+          return query.checkQueryForCompleted();
+
+        } else { // Finish a query
+          if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
+            SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
+            TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
+                subQuery.getTableMeta(), query.context.getOutputPath());
+            query.setResultDesc(desc);
+            try {
+              query.writeStat(query.context.getOutputPath(), subQuery);
+            } catch (IOException e) {
+              e.printStackTrace();
+            }
+            query.eventHandler.handle(new QueryFinishEvent(query.getId()));
+
+            if (query.context.isCreateTableQuery()) {
+              // TOOD move to QueryMasterManager
+              //query.context.getCatalog().addTable(desc);
+            }
+          }
+
+          return query.finished(QueryState.QUERY_SUCCEEDED);
+        }
+      } else {
+        // if at least one subquery is failed, the query is also failed.
+        return QueryState.QUERY_FAILED;
+      }
+    }
+  }
+
+  private static class DiagnosticsUpdateTransition implements
+      SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event)
+          .getDiagnosticUpdate());
+    }
+  }
+
+  private static class InitCompleteTransition implements
+      SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      if (query.initializationTime == 0) {
+        query.setInitializationTime();
+      }
+    }
+  }
+
+  private static class InternalErrorTransition
+      implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.finished(QueryState.QUERY_ERROR);
+    }
+  }
+
+  public QueryState finished(QueryState finalState) {
+    setFinishTime();
+    context.setState(finalState);
+    return finalState;
+  }
+
+  /**
+   * Check if all subqueries of the query are completed
+   * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
+   */
+  QueryState checkQueryForCompleted() {
+    if (completedSubQueryCount == subqueries.size()) {
+      return QueryState.QUERY_SUCCEEDED;
+    }
+    return getState();
+  }
+
+
+  @Override
+  public void handle(QueryEvent event) {
+    LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+    try {
+      writeLock.lock();
+      QueryState oldState = getState();
+      try {
+        getStateMachine().doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new QueryEvent(this.id,
+            QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (oldState != getState()) {
+        LOG.info(id + " Query Transitioned from " + oldState + " to "
+            + getState());
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void writeStat(Path outputPath, SubQuery subQuery)
+      throws IOException {
+    ExecutionBlock execBlock = subQuery.getBlock();
+    sm.writeTableMeta(outputPath, subQuery.getTableMeta());
+  }
+}