You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/01/08 16:36:14 UTC

[06/13] tajo git commit: TAJO-1288: Refactoring org.apache.tajo.master package.

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
new file mode 100644
index 0000000..351856f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -0,0 +1,926 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.plan.serder.LogicalNodeSerializer;
+import org.apache.tajo.plan.serder.PlanProto;
+import org.apache.tajo.storage.DataLocation;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class DefaultTaskScheduler extends AbstractTaskScheduler {
+  private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class);
+
+  private final TaskSchedulerContext context;
+  private Stage stage;
+
+  private Thread schedulingThread;
+  private AtomicBoolean stopEventHandling = new AtomicBoolean(false);
+
+  private ScheduledRequests scheduledRequests;
+  private TaskRequests taskRequests;
+
+  private int nextTaskId = 0;
+  private int scheduledObjectNum = 0;
+
+  public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) {
+    super(DefaultTaskScheduler.class.getName());
+    this.context = context;
+    this.stage = stage;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+
+    scheduledRequests = new ScheduledRequests();
+    taskRequests  = new TaskRequests();
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Start TaskScheduler");
+
+    this.schedulingThread = new Thread() {
+      public void run() {
+
+        while(!stopEventHandling.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            synchronized (schedulingThread){
+              schedulingThread.wait(100);
+            }
+            schedule();
+          } catch (InterruptedException e) {
+            break;
+          } catch (Throwable e) {
+            LOG.fatal(e.getMessage(), e);
+            break;
+          }
+        }
+        LOG.info("TaskScheduler schedulingThread stopped");
+      }
+    };
+
+    this.schedulingThread.start();
+    super.start();
+  }
+
+  private static final TaskAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
+  static {
+    ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0);
+
+    TajoWorkerProtocol.TaskRequestProto.Builder builder =
+        TajoWorkerProtocol.TaskRequestProto.newBuilder();
+    builder.setId(NULL_ATTEMPT_ID.getProto());
+    builder.setShouldDie(true);
+    builder.setOutputTable("");
+    builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
+    builder.setClusteredOutput(false);
+    stopTaskRunnerReq = builder.build();
+  }
+
+  @Override
+  public void stop() {
+    if(stopEventHandling.getAndSet(true)){
+      return;
+    }
+
+    if (schedulingThread != null) {
+      synchronized (schedulingThread) {
+        schedulingThread.notifyAll();
+      }
+    }
+
+    // Return all of request callbacks instantly.
+    if(taskRequests != null){
+      for (TaskRequestEvent req : taskRequests.taskRequestQueue) {
+        req.getCallback().run(stopTaskRunnerReq);
+      }
+    }
+
+    LOG.info("Task Scheduler stopped");
+    super.stop();
+  }
+
+  private Fragment[] fragmentsForNonLeafTask;
+  private Fragment[] broadcastFragmentsForNonLeafTask;
+
+  LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
+  public void schedule() {
+
+    if (taskRequests.size() > 0) {
+      if (scheduledRequests.leafTaskNum() > 0) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", LeafTask Schedule Request: " +
+            scheduledRequests.leafTaskNum());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledRequests.leafTaskNum());
+        LOG.debug("Get " + taskRequestEvents.size() + " taskRequestEvents ");
+        if (taskRequestEvents.size() > 0) {
+          scheduledRequests.assignToLeafTasks(taskRequestEvents);
+          taskRequestEvents.clear();
+        }
+      }
+    }
+
+    if (taskRequests.size() > 0) {
+      if (scheduledRequests.nonLeafTaskNum() > 0) {
+        LOG.debug("Try to schedule tasks with taskRequestEvents: " +
+            taskRequests.size() + ", NonLeafTask Schedule Request: " +
+            scheduledRequests.nonLeafTaskNum());
+        taskRequests.getTaskRequests(taskRequestEvents,
+            scheduledRequests.nonLeafTaskNum());
+        scheduledRequests.assignToNonLeafTasks(taskRequestEvents);
+        taskRequestEvents.clear();
+      }
+    }
+  }
+
+  @Override
+  public void handle(TaskSchedulerEvent event) {
+    if (event.getType() == EventType.T_SCHEDULE) {
+      if (event instanceof FragmentScheduleEvent) {
+        FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
+        if (context.isLeafQuery()) {
+          TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext();
+          Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++);
+          task.addFragment(castEvent.getLeftFragment(), true);
+          scheduledObjectNum++;
+          if (castEvent.hasRightFragments()) {
+            task.addFragments(castEvent.getRightFragments());
+          }
+          stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+        } else {
+          fragmentsForNonLeafTask = new FileFragment[2];
+          fragmentsForNonLeafTask[0] = castEvent.getLeftFragment();
+          if (castEvent.hasRightFragments()) {
+            FileFragment[] rightFragments = castEvent.getRightFragments().toArray(new FileFragment[]{});
+            fragmentsForNonLeafTask[1] = rightFragments[0];
+            if (rightFragments.length > 1) {
+              broadcastFragmentsForNonLeafTask = new FileFragment[rightFragments.length - 1];
+              System.arraycopy(rightFragments, 1, broadcastFragmentsForNonLeafTask, 0, broadcastFragmentsForNonLeafTask.length);
+            } else {
+              broadcastFragmentsForNonLeafTask = null;
+            }
+          }
+        }
+      } else if (event instanceof FetchScheduleEvent) {
+        FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
+        Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
+        TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
+        Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++);
+        scheduledObjectNum++;
+        for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
+          task.addFetches(eachFetch.getKey(), eachFetch.getValue());
+          task.addFragment(fragmentsForNonLeafTask[0], true);
+          if (fragmentsForNonLeafTask[1] != null) {
+            task.addFragment(fragmentsForNonLeafTask[1], true);
+          }
+        }
+        if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) {
+          task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask));
+        }
+        stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
+      } else if (event instanceof TaskAttemptToSchedulerEvent) {
+        TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
+        if (context.isLeafQuery()) {
+          scheduledRequests.addLeafTask(castEvent);
+        } else {
+          scheduledRequests.addNonLeafTask(castEvent);
+        }
+      }
+    } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
+      // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler.
+      // This event is triggered by TaskAttempt.
+      TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event;
+      scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId());
+      LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+      ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle(
+          new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
+    }
+  }
+
+  @Override
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+
+    taskRequests.handle(event);
+    int hosts = scheduledRequests.leafTaskHostMapping.size();
+
+    // if available cluster resource are large then tasks, the scheduler thread are working immediately.
+    if(remainingScheduledObjectNum() > 0 &&
+        (remainingScheduledObjectNum() <= hosts || hosts <= taskRequests.size())){
+      synchronized (schedulingThread){
+        schedulingThread.notifyAll();
+      }
+    }
+  }
+
+  @Override
+  public int remainingScheduledObjectNum() {
+    return scheduledObjectNum;
+  }
+
+  private class TaskRequests implements EventHandler<TaskRequestEvent> {
+    private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
+        new LinkedBlockingQueue<TaskRequestEvent>();
+
+    @Override
+    public void handle(TaskRequestEvent event) {
+      if(LOG.isDebugEnabled()){
+        LOG.debug("TaskRequest: " + event.getContainerId() + "," + event.getExecutionBlockId());
+      }
+
+      if(stopEventHandling.get()) {
+        event.getCallback().run(stopTaskRunnerReq);
+        return;
+      }
+      int qSize = taskRequestQueue.size();
+      if (qSize != 0 && qSize % 1000 == 0) {
+        LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
+      }
+      int remCapacity = taskRequestQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.warn("Very low remaining capacity in the event-queue "
+            + "of DefaultTaskScheduler: " + remCapacity);
+      }
+
+      taskRequestQueue.add(event);
+    }
+
+    public void getTaskRequests(final Collection<TaskRequestEvent> taskRequests,
+                                int num) {
+      taskRequestQueue.drainTo(taskRequests, num);
+    }
+
+    public int size() {
+      return taskRequestQueue.size();
+    }
+  }
+
+  /**
+   * One worker can have multiple running task runners. <code>HostVolumeMapping</code>
+   * describes various information for one worker, including :
+   * <ul>
+   *  <li>host name</li>
+   *  <li>rack name</li>
+   *  <li>unassigned tasks for each disk volume</li>
+   *  <li>last assigned volume id - it can be used for assigning task in a round-robin manner</li>
+   *  <li>the number of running tasks for each volume</li>
+   * </ul>, each task runner and the concurrency number of running tasks for volumes.
+   *
+   * Here, we identifier a task runner by {@link ContainerId}, and we use volume ids to identify
+   * all disks in this node. Actually, each volume is only used to distinguish disks, and we don't
+   * know a certain volume id indicates a certain disk. If you want to know volume id, please read the below section.
+   *
+   * <h3>Volume id</h3>
+   * Volume id is an integer. Each volume id identifies each disk volume.
+   *
+   * This volume id can be obtained from org.apache.hadoop.fs.BlockStorageLocation#getVolumeIds()}.   *
+   * HDFS cannot give any volume id due to unknown reason and disabled config 'dfs.client.file-block-locations.enabled'.
+   * In this case, the volume id will be -1 or other native integer.
+   *
+   * <h3>See Also</h3>
+   * <ul>
+   *   <li>HDFS-3672 (https://issues.apache.org/jira/browse/HDFS-3672).</li>
+   * </ul>
+   */
+  public class HostVolumeMapping {
+    private final String host;
+    private final String rack;
+    /** A key is disk volume, and a value is a list of tasks to be scheduled. */
+    private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume =
+        Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>());
+    /** A value is last assigned volume id for each task runner */
+    private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId,
+      Integer>();
+    /**
+     * A key is disk volume id, and a value is the load of this volume.
+     * This load is measured by counting how many number of tasks are running.
+     *
+     * These disk volumes are kept in an order of ascending order of the volume id.
+     * In other words, the head volume ids are likely to -1, meaning no given volume id.
+     */
+    private SortedMap<Integer, Integer> diskVolumeLoads = new TreeMap<Integer, Integer>();
+    /** The total number of remain tasks in this host */
+    private AtomicInteger remainTasksNum = new AtomicInteger(0);
+    public static final int REMOTE = -2;
+
+
+    public HostVolumeMapping(String host, String rack){
+      this.host = host;
+      this.rack = rack;
+    }
+
+    public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){
+      synchronized (unassignedTaskForEachVolume){
+        LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+        if (list == null) {
+          list = new LinkedHashSet<TaskAttempt>();
+          unassignedTaskForEachVolume.put(volumeId, list);
+        }
+        list.add(attemptId);
+      }
+
+      remainTasksNum.incrementAndGet();
+
+      if(!diskVolumeLoads.containsKey(volumeId)) diskVolumeLoads.put(volumeId, 0);
+    }
+
+    /**
+     *  Priorities
+     *  1. a task list in a volume of host
+     *  2. unknown block or Non-splittable task in host
+     *  3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
+     */
+    public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) {
+      int volumeId;
+      TaskAttemptId taskAttemptId = null;
+
+      if (!lastAssignedVolumeId.containsKey(containerId)) {
+        volumeId = getLowestVolumeId();
+        increaseConcurrency(containerId, volumeId);
+      } else {
+        volumeId = lastAssignedVolumeId.get(containerId);
+      }
+
+      if (unassignedTaskForEachVolume.size() >  0) {
+        int retry = unassignedTaskForEachVolume.size();
+        do {
+          //clean and get a remaining local task
+          taskAttemptId = getAndRemove(volumeId);
+          if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
+            decreaseConcurrency(containerId);
+            if (volumeId > REMOTE) {
+              diskVolumeLoads.remove(volumeId);
+            }
+          }
+
+          if (taskAttemptId == null) {
+            //reassign next volume
+            volumeId = getLowestVolumeId();
+            increaseConcurrency(containerId, volumeId);
+            retry--;
+          } else {
+            break;
+          }
+        } while (retry > 0);
+      } else {
+        this.remainTasksNum.set(0);
+      }
+      return taskAttemptId;
+    }
+
+    public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) {
+      TaskAttemptId taskAttemptId = null;
+
+      if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
+        int retry = unassignedTaskForEachVolume.size();
+        do {
+          //clean and get a remaining task
+          int volumeId = getLowestVolumeId();
+          taskAttemptId = getAndRemove(volumeId);
+          if (taskAttemptId == null) {
+            if (volumeId > REMOTE) {
+              diskVolumeLoads.remove(volumeId);
+            }
+            retry--;
+          } else {
+            break;
+          }
+        } while (retry > 0);
+      }
+      return taskAttemptId;
+    }
+
+    private synchronized TaskAttemptId getAndRemove(int volumeId){
+      TaskAttemptId taskAttemptId = null;
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId;
+
+      LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+      if(list != null && list.size() > 0){
+        TaskAttempt taskAttempt;
+        synchronized (unassignedTaskForEachVolume) {
+          Iterator<TaskAttempt> iterator = list.iterator();
+          taskAttempt = iterator.next();
+          iterator.remove();
+        }
+
+        this.remainTasksNum.getAndDecrement();
+        taskAttemptId = taskAttempt.getId();
+        for (DataLocation location : taskAttempt.getTask().getDataLocations()) {
+          if (!this.getHost().equals(location.getHost())) {
+            HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
+            if (volumeMapping != null) {
+              volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt);
+            }
+          }
+        }
+      }
+
+      if(list == null || list.isEmpty()) {
+        unassignedTaskForEachVolume.remove(volumeId);
+      }
+      return taskAttemptId;
+    }
+
+    private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
+
+      LinkedHashSet<TaskAttempt> tasks  = unassignedTaskForEachVolume.get(volumeId);
+
+      if(tasks != null && tasks.size() > 0){
+        tasks.remove(taskAttempt);
+        remainTasksNum.getAndDecrement();
+      } else {
+        unassignedTaskForEachVolume.remove(volumeId);
+      }
+    }
+
+    /**
+     * Increase the count of running tasks and disk loads for a certain task runner.
+     *
+     * @param containerId The task runner identifier
+     * @param volumeId Volume identifier
+     * @return the volume load (i.e., how many running tasks use this volume)
+     */
+    private synchronized int increaseConcurrency(TajoContainerId containerId, int volumeId) {
+
+      int concurrency = 1;
+      if (diskVolumeLoads.containsKey(volumeId)) {
+        concurrency = diskVolumeLoads.get(volumeId) + 1;
+      }
+
+      if (volumeId > -1) {
+        LOG.info("Assigned host : " + host + ", Volume : " + volumeId + ", Concurrency : " + concurrency);
+      } else if (volumeId == -1) {
+        // this case is disabled namenode block meta or compressed text file or amazon s3
+        LOG.info("Assigned host : " + host + ", Unknown Volume : " + volumeId + ", Concurrency : " + concurrency);
+      } else if (volumeId == REMOTE) {
+        // this case has processed all block on host and it will be assigned to remote
+        LOG.info("Assigned host : " + host + ", Remaining local tasks : " + getRemainingLocalTaskSize()
+            + ", Remote Concurrency : " + concurrency);
+      }
+      diskVolumeLoads.put(volumeId, concurrency);
+      lastAssignedVolumeId.put(containerId, volumeId);
+      return concurrency;
+    }
+
+    /**
+     * Decrease the count of running tasks of a certain task runner
+     */
+    private synchronized void decreaseConcurrency(TajoContainerId containerId){
+      Integer volumeId = lastAssignedVolumeId.get(containerId);
+      if(volumeId != null && diskVolumeLoads.containsKey(volumeId)){
+        Integer concurrency = diskVolumeLoads.get(volumeId);
+        if(concurrency > 0){
+          diskVolumeLoads.put(volumeId, concurrency - 1);
+        } else {
+          if (volumeId > REMOTE) {
+            diskVolumeLoads.remove(volumeId);
+          }
+        }
+      }
+      lastAssignedVolumeId.remove(containerId);
+    }
+
+    /**
+     *  volume of a host : 0 ~ n
+     *  compressed task, amazon s3, unKnown volume : -1
+     *  remote task : -2
+     */
+    public int getLowestVolumeId(){
+      Map.Entry<Integer, Integer> volumeEntry = null;
+
+      for (Map.Entry<Integer, Integer> entry : diskVolumeLoads.entrySet()) {
+        if(volumeEntry == null) volumeEntry = entry;
+
+        if (volumeEntry.getValue() >= entry.getValue()) {
+          volumeEntry = entry;
+        }
+      }
+
+      if(volumeEntry != null){
+        return volumeEntry.getKey();
+      } else {
+        return REMOTE;
+      }
+    }
+
+    public boolean isAssigned(TajoContainerId containerId){
+      return lastAssignedVolumeId.containsKey(containerId);
+    }
+
+    public boolean isRemote(TajoContainerId containerId){
+      Integer volumeId = lastAssignedVolumeId.get(containerId);
+      if(volumeId == null || volumeId > REMOTE){
+        return false;
+      } else {
+        return true;
+      }
+    }
+
+    public int getRemoteConcurrency(){
+      return getVolumeConcurrency(REMOTE);
+    }
+
+    public int getVolumeConcurrency(int volumeId){
+      Integer size = diskVolumeLoads.get(volumeId);
+      if(size == null) return 0;
+      else return size;
+    }
+
+    public int getRemainingLocalTaskSize(){
+      return remainTasksNum.get();
+    }
+
+    public String getHost() {
+
+      return host;
+    }
+
+    public String getRack() {
+      return rack;
+    }
+  }
+
+  private class ScheduledRequests {
+    // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
+    // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
+    // if the task is not included in leafTasks and nonLeafTasks.
+    private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
+    private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
+    private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap();
+    private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
+
+    private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) {
+      TaskAttempt taskAttempt = event.getTaskAttempt();
+      List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
+
+      for (DataLocation location : locations) {
+        String host = location.getHost();
+
+        HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+        if (hostVolumeMapping == null) {
+          String rack = RackResolver.resolve(host).getNetworkLocation();
+          hostVolumeMapping = new HostVolumeMapping(host, rack);
+          leafTaskHostMapping.put(host, hostVolumeMapping);
+        }
+        hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Added attempt req to host " + host);
+        }
+
+        HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
+        if (list == null) {
+          list = new HashSet<TaskAttemptId>();
+          leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
+        }
+
+        list.add(taskAttempt.getId());
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
+        }
+      }
+
+      leafTasks.add(taskAttempt.getId());
+    }
+
+    private void addNonLeafTask(TaskAttemptToSchedulerEvent event) {
+      nonLeafTasks.add(event.getTaskAttempt().getId());
+    }
+
+    public int leafTaskNum() {
+      return leafTasks.size();
+    }
+
+    public int nonLeafTaskNum() {
+      return nonLeafTasks.size();
+    }
+
+    public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>();
+
+    private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){
+      HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+      if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
+        for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) {
+          TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+
+          if(attemptId == null) break;
+          //find remaining local task
+          if (leafTasks.contains(attemptId)) {
+            leafTasks.remove(attemptId);
+            //LOG.info(attemptId + " Assigned based on host match " + hostName);
+            hostLocalAssigned++;
+            totalAssigned++;
+            return attemptId;
+          }
+        }
+      }
+      return null;
+    }
+
+    private TaskAttemptId allocateRackTask(String host) {
+
+      List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values());
+      String rack = RackResolver.resolve(host).getNetworkLocation();
+      TaskAttemptId attemptId = null;
+
+      if (remainingTasks.size() > 0) {
+        synchronized (scheduledRequests) {
+          //find largest remaining task of other host in rack
+          Collections.sort(remainingTasks, new Comparator<HostVolumeMapping>() {
+            @Override
+            public int compare(HostVolumeMapping v1, HostVolumeMapping v2) {
+              // descending remaining tasks
+              if (v2.remainTasksNum.get() > v1.remainTasksNum.get()) {
+                return 1;
+              } else if (v2.remainTasksNum.get() == v1.remainTasksNum.get()) {
+                return 0;
+              } else {
+                return -1;
+              }
+            }
+          });
+        }
+
+        for (HostVolumeMapping tasks : remainingTasks) {
+          for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) {
+            TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack);
+
+            if (tId == null) break;
+
+            if (leafTasks.contains(tId)) {
+              leafTasks.remove(tId);
+              attemptId = tId;
+              break;
+            }
+          }
+          if(attemptId != null) break;
+        }
+      }
+
+      //find task in rack
+      if (attemptId == null) {
+        HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack);
+        if (list != null) {
+          synchronized (list) {
+            Iterator<TaskAttemptId> iterator = list.iterator();
+            while (iterator.hasNext()) {
+              TaskAttemptId tId = iterator.next();
+              iterator.remove();
+              if (leafTasks.contains(tId)) {
+                leafTasks.remove(tId);
+                attemptId = tId;
+                break;
+              }
+            }
+          }
+        }
+      }
+
+      if (attemptId != null) {
+        rackLocalAssigned++;
+        totalAssigned++;
+
+        LOG.info(String.format("Assigned Local/Rack/Total: (%d/%d/%d), Locality: %.2f%%, Rack host: %s",
+            hostLocalAssigned, rackLocalAssigned, totalAssigned,
+            ((double) hostLocalAssigned / (double) totalAssigned) * 100, host));
+
+      }
+      return attemptId;
+    }
+
+    public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+      Collections.shuffle(taskRequests);
+      LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
+
+      TaskRequestEvent taskRequest;
+      while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
+        taskRequest = taskRequests.pollFirst();
+        if(taskRequest == null) { // if there are only remote task requests
+          taskRequest = remoteTaskRequests.pollFirst();
+        }
+
+        // checking if this container is still alive.
+        // If not, ignore the task request and stop the task runner
+        ContainerProxy container = context.getMasterContext().getResourceAllocator()
+            .getContainer(taskRequest.getContainerId());
+        if(container == null) {
+          taskRequest.getCallback().run(stopTaskRunnerReq);
+          continue;
+        }
+
+        // getting the hostname of requested node
+        WorkerConnectionInfo connectionInfo =
+            context.getMasterContext().getResourceAllocator().getWorkerConnectionInfo(taskRequest.getWorkerId());
+        String host = connectionInfo.getHost();
+
+        // if there are no worker matched to the hostname a task request
+        if(!leafTaskHostMapping.containsKey(host)){
+          String normalizedHost = NetUtils.normalizeHost(host);
+
+          if(!leafTaskHostMapping.containsKey(normalizedHost) && !taskRequests.isEmpty()){
+            // this case means one of either cases:
+            // * there are no blocks which reside in this node.
+            // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
+            // In this case, we transfer the task request to the remote task request list, and skip the followings.
+            remoteTaskRequests.add(taskRequest);
+            continue;
+          }
+        }
+
+        TajoContainerId containerId = taskRequest.getContainerId();
+        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
+            "containerId=" + containerId);
+
+        //////////////////////////////////////////////////////////////////////
+        // disk or host-local allocation
+        //////////////////////////////////////////////////////////////////////
+        TaskAttemptId attemptId = allocateLocalTask(host, containerId);
+
+        if (attemptId == null) { // if a local task cannot be found
+          HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
+
+          if(hostVolumeMapping != null) {
+            if(!hostVolumeMapping.isRemote(containerId)){
+              // assign to remote volume
+              hostVolumeMapping.decreaseConcurrency(containerId);
+              hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
+            }
+            // this part is remote concurrency management of a tail tasks
+            int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
+
+            if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
+              //release container
+              hostVolumeMapping.decreaseConcurrency(containerId);
+              taskRequest.getCallback().run(stopTaskRunnerReq);
+              continue;
+            }
+          }
+
+          //////////////////////////////////////////////////////////////////////
+          // rack-local allocation
+          //////////////////////////////////////////////////////////////////////
+          attemptId = allocateRackTask(host);
+
+          //////////////////////////////////////////////////////////////////////
+          // random node allocation
+          //////////////////////////////////////////////////////////////////////
+          if (attemptId == null && leafTaskNum() > 0) {
+            synchronized (leafTasks){
+              attemptId = leafTasks.iterator().next();
+              leafTasks.remove(attemptId);
+              rackLocalAssigned++;
+              totalAssigned++;
+              LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
+                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
+                  ((double) hostLocalAssigned / (double) totalAssigned) * 100));
+            }
+          }
+        }
+
+        if (attemptId != null) {
+          Task task = stage.getTask(attemptId.getTaskId());
+          TaskRequest taskAssign = new TaskRequestImpl(
+              attemptId,
+              new ArrayList<FragmentProto>(task.getAllFragments()),
+              "",
+              false,
+              LogicalNodeSerializer.serialize(task.getLogicalPlan()),
+              context.getMasterContext().getQueryContext(),
+              stage.getDataChannel(), stage.getBlock().getEnforcer());
+          if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
+            taskAssign.setInterQuery();
+          }
+
+          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+              taskRequest.getContainerId(), connectionInfo));
+          assignedRequest.add(attemptId);
+
+          scheduledObjectNum--;
+          taskRequest.getCallback().run(taskAssign.getProto());
+        } else {
+          throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
+        }
+      }
+    }
+
+    private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+      if (masterPlan.isRoot(block)) {
+        return false;
+      }
+
+      ExecutionBlock parent = masterPlan.getParent(block);
+      if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+        return false;
+      }
+
+      return true;
+    }
+
+    public void assignToNonLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
+      Collections.shuffle(taskRequests);
+
+      TaskRequestEvent taskRequest;
+      while (!taskRequests.isEmpty()) {
+        taskRequest = taskRequests.pollFirst();
+        LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
+
+        TaskAttemptId attemptId;
+        // random allocation
+        if (nonLeafTasks.size() > 0) {
+          synchronized (nonLeafTasks){
+            attemptId = nonLeafTasks.iterator().next();
+            nonLeafTasks.remove(attemptId);
+          }
+          LOG.debug("Assigned based on * match");
+
+          Task task;
+          task = stage.getTask(attemptId.getTaskId());
+          TaskRequest taskAssign = new TaskRequestImpl(
+              attemptId,
+              Lists.newArrayList(task.getAllFragments()),
+              "",
+              false,
+              LogicalNodeSerializer.serialize(task.getLogicalPlan()),
+              context.getMasterContext().getQueryContext(),
+              stage.getDataChannel(),
+              stage.getBlock().getEnforcer());
+          if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
+            taskAssign.setInterQuery();
+          }
+          for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) {
+            Collection<FetchImpl> fetches = entry.getValue();
+            if (fetches != null) {
+              for (FetchImpl fetch : fetches) {
+                taskAssign.addFetch(entry.getKey(), fetch);
+              }
+            }
+          }
+
+          WorkerConnectionInfo connectionInfo = context.getMasterContext().getResourceAllocator().
+              getWorkerConnectionInfo(taskRequest.getWorkerId());
+          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
+              taskRequest.getContainerId(), connectionInfo));
+          taskRequest.getCallback().run(taskAssign.getProto());
+          totalAssigned++;
+          scheduledObjectNum--;
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
new file mode 100644
index 0000000..5fe2f80
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.event.TaskSchedulerEvent;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.util.List;
+import java.util.Map;
+
+public class FetchScheduleEvent extends TaskSchedulerEvent {
+  private final Map<String, List<FetchImpl>> fetches;
+
+  public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
+                            final Map<String, List<FetchImpl>> fetches) {
+    super(eventType, blockId);
+    this.fetches = fetches;
+  }
+
+  public Map<String, List<FetchImpl>> getFetches() {
+    return fetches;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
new file mode 100644
index 0000000..2932694
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.*;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.history.QueryHistory;
+import org.apache.tajo.util.history.StageHistory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class Query implements EventHandler<QueryEvent> {
+  private static final Log LOG = LogFactory.getLog(Query.class);
+
+  // Facilities for Query
+  private final TajoConf systemConf;
+  private final Clock clock;
+  private String queryStr;
+  private Map<ExecutionBlockId, Stage> stages;
+  private final EventHandler eventHandler;
+  private final MasterPlan plan;
+  QueryMasterTask.QueryMasterTaskContext context;
+  private ExecutionBlockCursor cursor;
+
+  // Query Status
+  private final QueryId id;
+  private long appSubmitTime;
+  private long startTime;
+  private long finishTime;
+  private TableDesc resultDesc;
+  private int completedStagesCount = 0;
+  private int successedStagesCount = 0;
+  private int killedStagesCount = 0;
+  private int failedStagesCount = 0;
+  private int erroredStagesCount = 0;
+  private final List<String> diagnostics = new ArrayList<String>();
+
+  // Internal Variables
+  private final Lock readLock;
+  private final Lock writeLock;
+  private int priority = 100;
+
+  // State Machine
+  private final StateMachine<QueryState, QueryEventType, QueryEvent> stateMachine;
+  private QueryState queryState;
+
+  // Transition Handler
+  private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
+  private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+  private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition();
+  private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
+
+  protected static final StateMachineFactory
+      <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
+      new StateMachineFactory<Query, QueryState, QueryEventType, QueryEvent>
+          (QueryState.QUERY_NEW)
+
+          // Transitions from NEW state
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_RUNNING,
+              QueryEventType.START,
+              new StartTransition())
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+              QueryEventType.KILL,
+              new KillNewQueryTransition())
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from RUNNING state
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+              QueryEventType.STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                  QueryState.QUERY_ERROR),
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.KILL,
+              new KillAllStagesTransition())
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from QUERY_SUCCEEDED state
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          // ignore-able transitions
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED,
+              QueryEventType.KILL)
+          .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+
+          // Transitions from KILL_WAIT state
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.STAGE_COMPLETED,
+              STAGE_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                  QueryState.QUERY_ERROR),
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_KILLED),
+              QueryEventType.KILL,
+              QUERY_COMPLETED_TRANSITION)
+
+          // Transitions from FAILED state
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+              QueryEventType.KILL)
+
+          // Transitions from ERROR state
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED))
+
+          .installTopology();
+
+  public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
+               final long appSubmitTime,
+               final String queryStr,
+               final EventHandler eventHandler,
+               final MasterPlan plan) {
+    this.context = context;
+    this.systemConf = context.getConf();
+    this.id = id;
+    this.clock = context.getClock();
+    this.appSubmitTime = appSubmitTime;
+    this.queryStr = queryStr;
+    this.stages = Maps.newConcurrentMap();
+    this.eventHandler = eventHandler;
+    this.plan = plan;
+    this.cursor = new ExecutionBlockCursor(plan, true);
+
+    StringBuilder sb = new StringBuilder("\n=======================================================");
+    sb.append("\nThe order of execution: \n");
+    int order = 1;
+    while (cursor.hasNext()) {
+      ExecutionBlock currentEB = cursor.nextBlock();
+      sb.append("\n").append(order).append(": ").append(currentEB.getId());
+      order++;
+    }
+    sb.append("\n=======================================================");
+    LOG.info(sb);
+    cursor.reset();
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    this.readLock = readWriteLock.readLock();
+    this.writeLock = readWriteLock.writeLock();
+
+    stateMachine = stateMachineFactory.make(this);
+    queryState = stateMachine.getCurrentState();
+  }
+
+  public float getProgress() {
+    QueryState state = getState();
+    if (state == QueryState.QUERY_SUCCEEDED) {
+      return 1.0f;
+    } else {
+      int idx = 0;
+      List<Stage> tempStages = new ArrayList<Stage>();
+      synchronized(stages) {
+        tempStages.addAll(stages.values());
+      }
+
+      float [] subProgresses = new float[tempStages.size()];
+      for (Stage stage: tempStages) {
+        if (stage.getState() != StageState.NEW) {
+          subProgresses[idx] = stage.getProgress();
+        } else {
+          subProgresses[idx] = 0.0f;
+        }
+        idx++;
+      }
+
+      float totalProgress = 0.0f;
+      float proportion = 1.0f / (float)(getExecutionBlockCursor().size() - 1); // minus one is due to
+
+      for (int i = 0; i < subProgresses.length; i++) {
+        totalProgress += subProgresses[i] * proportion;
+      }
+
+      return totalProgress;
+    }
+  }
+
+  public long getAppSubmitTime() {
+    return this.appSubmitTime;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime() {
+    startTime = clock.getTime();
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime() {
+    finishTime = clock.getTime();
+  }
+
+  public QueryHistory getQueryHistory() {
+    QueryHistory queryHistory = makeQueryHistory();
+    queryHistory.setStageHistories(makeStageHistories());
+    return queryHistory;
+  }
+
+  private List<StageHistory> makeStageHistories() {
+    List<StageHistory> stageHistories = new ArrayList<StageHistory>();
+    for(Stage eachStage : getStages()) {
+      stageHistories.add(eachStage.getStageHistory());
+    }
+
+    return stageHistories;
+  }
+
+  private QueryHistory makeQueryHistory() {
+    QueryHistory queryHistory = new QueryHistory();
+
+    queryHistory.setQueryId(getId().toString());
+    queryHistory.setQueryMaster(context.getQueryMasterContext().getWorkerContext().getWorkerName());
+    queryHistory.setHttpPort(context.getQueryMasterContext().getWorkerContext().getConnectionInfo().getHttpInfoPort());
+    queryHistory.setLogicalPlan(plan.toString());
+    queryHistory.setLogicalPlan(plan.getLogicalPlan().toString());
+    queryHistory.setDistributedPlan(plan.toString());
+
+    List<String[]> sessionVariables = new ArrayList<String[]>();
+    for(Map.Entry<String,String> entry: plan.getContext().getAllKeyValus().entrySet()) {
+      if (SessionVars.exists(entry.getKey()) && SessionVars.isPublic(SessionVars.get(entry.getKey()))) {
+        sessionVariables.add(new String[]{entry.getKey(), entry.getValue()});
+      }
+    }
+    queryHistory.setSessionVariables(sessionVariables);
+
+    return queryHistory;
+  }
+
+  public List<String> getDiagnostics() {
+    readLock.lock();
+    try {
+      return diagnostics;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected void addDiagnostic(String diag) {
+    diagnostics.add(diag);
+  }
+
+  public TableDesc getResultDesc() {
+    return resultDesc;
+  }
+
+  public void setResultDesc(TableDesc desc) {
+    resultDesc = desc;
+  }
+
+  public MasterPlan getPlan() {
+    return plan;
+  }
+
+  public StateMachine<QueryState, QueryEventType, QueryEvent> getStateMachine() {
+    return stateMachine;
+  }
+  
+  public void addStage(Stage stage) {
+    stages.put(stage.getId(), stage);
+  }
+  
+  public QueryId getId() {
+    return this.id;
+  }
+
+  public Stage getStage(ExecutionBlockId id) {
+    return this.stages.get(id);
+  }
+
+  public Collection<Stage> getStages() {
+    return this.stages.values();
+  }
+
+  public QueryState getSynchronizedState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /* non-blocking call for client API */
+  public QueryState getState() {
+    return queryState;
+  }
+
+  public ExecutionBlockCursor getExecutionBlockCursor() {
+    return cursor;
+  }
+
+  public static class StartTransition
+      implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent queryEvent) {
+
+      query.setStartTime();
+      Stage stage = new Stage(query.context, query.getPlan(),
+          query.getExecutionBlockCursor().nextBlock());
+      stage.setPriority(query.priority--);
+      query.addStage(stage);
+
+      stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT));
+      LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan());
+    }
+  }
+
+  public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+
+    @Override
+    public QueryState transition(Query query, QueryEvent queryEvent) {
+      QueryCompletedEvent stageEvent = (QueryCompletedEvent) queryEvent;
+      QueryState finalState;
+
+      if (stageEvent.getState() == StageState.SUCCEEDED) {
+        finalState = finalizeQuery(query, stageEvent);
+      } else if (stageEvent.getState() == StageState.FAILED) {
+        finalState = QueryState.QUERY_FAILED;
+      } else if (stageEvent.getState() == StageState.KILLED) {
+        finalState = QueryState.QUERY_KILLED;
+      } else {
+        finalState = QueryState.QUERY_ERROR;
+      }
+      if (finalState != QueryState.QUERY_SUCCEEDED) {
+        Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
+        if (lastStage != null && lastStage.getTableMeta() != null) {
+          StoreType storeType = lastStage.getTableMeta().getStoreType();
+          if (storeType != null) {
+            LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+            try {
+              StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
+            } catch (IOException e) {
+              LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+            }
+          }
+        }
+      }
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+      query.setFinishTime();
+
+      return finalState;
+    }
+
+    private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
+      Stage lastStage = query.getStage(event.getExecutionBlockId());
+      StoreType storeType = lastStage.getTableMeta().getStoreType();
+      try {
+        LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+        CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+        TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+
+        Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
+            .commitOutputData(query.context.getQueryContext(),
+                lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
+
+        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
+      } catch (Exception e) {
+        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
+        return QueryState.QUERY_ERROR;
+      }
+
+      return QueryState.QUERY_SUCCEEDED;
+    }
+
+    private static interface QueryHook {
+      boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
+      void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
+                   ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception;
+    }
+
+    private class QueryHookExecutor {
+      private List<QueryHook> hookList = TUtil.newList();
+      private QueryMaster.QueryMasterContext context;
+
+      public QueryHookExecutor(QueryMaster.QueryMasterContext context) {
+        this.context = context;
+        hookList.add(new MaterializedResultHook());
+        hookList.add(new CreateTableHook());
+        hookList.add(new InsertTableHook());
+      }
+
+      public void execute(QueryContext queryContext, Query query,
+                          ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir) throws Exception {
+        for (QueryHook hook : hookList) {
+          if (hook.isEligible(queryContext, query, finalExecBlockId, finalOutputDir)) {
+            hook.execute(context, queryContext, query, finalExecBlockId, finalOutputDir);
+          }
+        }
+      }
+    }
+
+    private class MaterializedResultHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                Path finalOutputDir) {
+        Stage lastStage = query.getStage(finalExecBlockId);
+        NodeType type = lastStage.getBlock().getPlan().getType();
+        return type != NodeType.CREATE_TABLE && type != NodeType.INSERT;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId,
+                          Path finalOutputDir) throws Exception {
+        Stage lastStage = query.getStage(finalExecBlockId);
+        TableMeta meta = lastStage.getTableMeta();
+
+        String nullChar = queryContext.get(SessionVars.NULL_CHAR);
+        meta.putOption(StorageConstants.TEXT_NULL, nullChar);
+
+        TableStats stats = lastStage.getResultStats();
+
+        TableDesc resultTableDesc =
+            new TableDesc(
+                query.getId().toString(),
+                lastStage.getSchema(),
+                meta,
+                finalOutputDir.toUri());
+        resultTableDesc.setExternal(true);
+
+        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+        resultTableDesc.setStats(stats);
+        query.setResultDesc(resultTableDesc);
+      }
+    }
+
+    private class CreateTableHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                Path finalOutputDir) {
+        Stage lastStage = query.getStage(finalExecBlockId);
+        return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_TABLE;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        Stage lastStage = query.getStage(finalExecBlockId);
+        TableStats stats = lastStage.getResultStats();
+
+        CreateTableNode createTableNode = (CreateTableNode) lastStage.getBlock().getPlan();
+        TableMeta meta = new TableMeta(createTableNode.getStorageType(), createTableNode.getOptions());
+
+        TableDesc tableDescTobeCreated =
+            new TableDesc(
+                createTableNode.getTableName(),
+                createTableNode.getTableSchema(),
+                meta,
+                finalOutputDir.toUri());
+        tableDescTobeCreated.setExternal(createTableNode.isExternal());
+
+        if (createTableNode.hasPartition()) {
+          tableDescTobeCreated.setPartitionMethod(createTableNode.getPartitionMethod());
+        }
+
+        stats.setNumBytes(getTableVolume(query.systemConf, finalOutputDir));
+        tableDescTobeCreated.setStats(stats);
+        query.setResultDesc(tableDescTobeCreated);
+
+        catalog.createTable(tableDescTobeCreated);
+      }
+    }
+
+    private class InsertTableHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId,
+                                Path finalOutputDir) {
+        Stage lastStage = query.getStage(finalExecBlockId);
+        return lastStage.getBlock().getPlan().getType() == NodeType.INSERT;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext,
+                          Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir)
+          throws Exception {
+
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        Stage lastStage = query.getStage(finalExecBlockId);
+        TableMeta meta = lastStage.getTableMeta();
+        TableStats stats = lastStage.getResultStats();
+
+        InsertNode insertNode = (InsertNode) lastStage.getBlock().getPlan();
+
+        TableDesc finalTable;
+        if (insertNode.hasTargetTable()) {
+          String tableName = insertNode.getTableName();
+          finalTable = catalog.getTableDesc(tableName);
+        } else {
+          String tableName = query.getId().toString();
+          finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri());
+        }
+
+        long volume = getTableVolume(query.systemConf, finalOutputDir);
+        stats.setNumBytes(volume);
+        finalTable.setStats(stats);
+
+        if (insertNode.hasTargetTable()) {
+          UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
+          builder.setTableName(finalTable.getName());
+          builder.setStats(stats.getProto());
+
+          catalog.updateTableStats(builder.build());
+        }
+
+        query.setResultDesc(finalTable);
+      }
+    }
+  }
+
+  public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(systemConf);
+    ContentSummary directorySummary = fs.getContentSummary(tablePath);
+    return directorySummary.getLength();
+  }
+
+  public static class StageCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    private boolean hasNext(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.peek();
+      return !query.getPlan().isTerminal(nextBlock);
+    }
+
+    private void executeNextBlock(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.nextBlock();
+      Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock);
+      nextStage.setPriority(query.priority--);
+      query.addStage(nextStage);
+      nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT));
+
+      LOG.info("Scheduling Stage:" + nextStage.getId());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority());
+        LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan());
+      }
+    }
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      try {
+        query.completedStagesCount++;
+        StageCompletedEvent castEvent = (StageCompletedEvent) event;
+
+        if (castEvent.getState() == StageState.SUCCEEDED) {
+          query.successedStagesCount++;
+        } else if (castEvent.getState() == StageState.KILLED) {
+          query.killedStagesCount++;
+        } else if (castEvent.getState() == StageState.FAILED) {
+          query.failedStagesCount++;
+        } else if (castEvent.getState() == StageState.ERROR) {
+          query.erroredStagesCount++;
+        } else {
+          LOG.error(String.format("Invalid Stage (%s) State %s at %s",
+              castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name()));
+          query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+        }
+
+        // if a stage is succeeded and a query is running
+        if (castEvent.getState() == StageState.SUCCEEDED &&  // latest stage succeeded
+            query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
+            hasNext(query)) {                                   // there remains at least one stage.
+          query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport();
+          executeNextBlock(query);
+        } else { // if a query is completed due to finished, kill, failure, or error
+          query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+        }
+      } catch (Throwable t) {
+        LOG.error(t.getMessage(), t);
+        query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+      }
+    }
+  }
+
+  private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate());
+    }
+  }
+
+  private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.setFinishTime();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+    }
+  }
+
+  private static class KillAllStagesTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      synchronized (query.stages) {
+        for (Stage stage : query.stages.values()) {
+          query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL));
+        }
+      }
+    }
+  }
+
+  private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.setFinishTime();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+    }
+  }
+
+  @Override
+  public void handle(QueryEvent event) {
+    LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
+    try {
+      writeLock.lock();
+      QueryState oldState = getSynchronizedState();
+      try {
+        getStateMachine().doTransition(event.getType(), event);
+        queryState = getSynchronizedState();
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state"
+            + ", type:" + event
+            + ", oldState:" + oldState.name()
+            + ", nextState:" + getSynchronizedState().name()
+            , e);
+        eventHandler.handle(new QueryEvent(this.id, QueryEventType.INTERNAL_ERROR));
+      }
+
+      //notify the eventhandler of state change
+      if (oldState != getSynchronizedState()) {
+        LOG.info(id + " Query Transitioned from " + oldState + " to " + getSynchronizedState());
+      }
+    }
+
+    finally {
+      writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
new file mode 100644
index 0000000..bda2ec1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.session.Session;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
+
+public class QueryInProgress extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+  private QueryId queryId;
+
+  private Session session;
+
+  private AsyncDispatcher dispatcher;
+
+  private LogicalRootNode plan;
+
+  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private QueryInfo queryInfo;
+
+  private final TajoMaster.MasterContext masterContext;
+
+  private NettyClientBase queryMasterRpc;
+
+  private QueryMasterProtocolService queryMasterRpcClient;
+
+  public QueryInProgress(
+      TajoMaster.MasterContext masterContext,
+      Session session,
+      QueryContext queryContext,
+      QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) {
+    super(QueryInProgress.class.getName());
+    this.masterContext = masterContext;
+    this.session = session;
+    this.queryId = queryId;
+    this.plan = plan;
+
+    queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr);
+    queryInfo.setStartTime(System.currentTimeMillis());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    dispatcher = new AsyncDispatcher();
+    this.addService(dispatcher);
+
+    dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
+    super.init(conf);
+  }
+
+  public synchronized void kill() {
+    if(queryMasterRpcClient != null){
+      queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+    }
+  }
+
+  @Override
+  public void stop() {
+    if(stopped.getAndSet(true)) {
+      return;
+    }
+
+    LOG.info("=========================================================");
+    LOG.info("Stop query:" + queryId);
+
+    masterContext.getResourceManager().stopQueryMaster(queryId);
+
+    long startTime = System.currentTimeMillis();
+    while(true) {
+      try {
+        if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+          LOG.info(queryId + " QueryMaster stopped");
+          break;
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        break;
+      }
+
+      try {
+        synchronized (this){
+          wait(100);
+        }
+      } catch (InterruptedException e) {
+        break;
+      }
+      if(System.currentTimeMillis() - startTime > 60 * 1000) {
+        LOG.warn("Failed to stop QueryMaster:" + queryId);
+        break;
+      }
+    }
+
+    if(queryMasterRpc != null) {
+      RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc);
+    }
+
+    masterContext.getHistoryWriter().appendHistory(queryInfo);
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+
+
+  public boolean startQueryMaster() {
+    try {
+      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+      WorkerResourceManager resourceManager = masterContext.getResourceManager();
+      WorkerAllocatedResource resource = resourceManager.allocateQueryMaster(this);
+
+      // if no resource to allocate a query master
+      if(resource == null) {
+        LOG.info("No Available Resources for QueryMaster");
+        return false;
+      }
+
+      queryInfo.setQueryMaster(resource.getConnectionInfo().getHost());
+      queryInfo.setQueryMasterPort(resource.getConnectionInfo().getQueryMasterPort());
+      queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort());
+      queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort());
+
+      getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+
+      return true;
+    } catch (Exception e) {
+      catchException(e);
+      return false;
+    }
+  }
+
+  class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
+    @Override
+    public void handle(QueryJobEvent queryJobEvent) {
+      if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        heartbeat(queryJobEvent.getQueryInfo());
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+        queryInProgress.getEventHandler().handle(
+            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
+        submmitQueryToMaster();
+      } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        kill();
+      }
+    }
+  }
+
+  private void connectQueryMaster() throws Exception {
+    InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort());
+    LOG.info("Connect to QueryMaster:" + addr);
+    queryMasterRpc =
+        RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true);
+    queryMasterRpcClient = queryMasterRpc.getStub();
+  }
+
+  private synchronized void submmitQueryToMaster() {
+    if(querySubmitted.get()) {
+      return;
+    }
+
+    try {
+      if(queryMasterRpcClient == null) {
+        connectQueryMaster();
+      }
+      if(queryMasterRpcClient == null) {
+        LOG.info("No QueryMaster conneciton info.");
+        //TODO wait
+        return;
+      }
+      LOG.info("Call executeQuery to :" +
+          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+
+      QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder();
+      builder.setQueryId(queryId.getProto())
+          .setQueryContext(queryInfo.getQueryContext().getProto())
+          .setSession(session.getProto())
+          .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr()))
+          .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build());
+
+      queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get());
+      querySubmitted.set(true);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public void catchException(Exception e) {
+    LOG.error(e.getMessage(), e);
+    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+    queryInfo.setLastMessage(StringUtils.stringifyException(e));
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public boolean isStarted() {
+    return !stopped.get() && this.querySubmitted.get();
+  }
+
+  private void heartbeat(QueryInfo queryInfo) {
+    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+
+    // to avoid partial update by different heartbeats
+    synchronized (this.queryInfo) {
+
+      // terminal state will let client to retrieve a query result
+      // So, we must set the query result before changing query state
+      if (isFinishState(queryInfo.getQueryState())) {
+        if (queryInfo.hasResultdesc()) {
+          this.queryInfo.setResultDesc(queryInfo.getResultDesc());
+        }
+      }
+
+      this.queryInfo.setQueryState(queryInfo.getQueryState());
+      this.queryInfo.setProgress(queryInfo.getProgress());
+      this.queryInfo.setFinishTime(queryInfo.getFinishTime());
+
+      // Update diagnosis message
+      if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+        this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+        LOG.info(queryId + queryInfo.getLastMessage());
+      }
+
+      // if any error occurs, print outs the error message
+      if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+        LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+      }
+
+
+      if (isFinishState(this.queryInfo.getQueryState())) {
+        masterContext.getQueryJobManager().getEventHandler().handle(
+            new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo));
+      }
+    }
+  }
+
+  private boolean isFinishState(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_FAILED ||
+        state == TajoProtos.QueryState.QUERY_KILLED ||
+        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/1c29c1cb/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
new file mode 100644
index 0000000..1a1f2ff
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.querymaster;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.master.QueryInfo;
+
+public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
+  private QueryInfo queryInfo;
+
+  public QueryJobEvent(Type type, QueryInfo queryInfo) {
+    super(type);
+
+    this.queryInfo = queryInfo;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  public enum Type {
+    QUERY_JOB_START,
+    QUERY_JOB_HEARTBEAT,
+    QUERY_JOB_FINISH,
+    QUERY_JOB_STOP,
+    QUERY_MASTER_START,
+    QUERY_MASTER_STOP,
+    QUERY_JOB_KILL
+  }
+}